From 9d74a5b723436895becd1e1f29c5f14991360018 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Fri, 4 Mar 2016 16:39:25 -0800 Subject: [PATCH 1/4] Scala implicit ordering fun -- have 2 forms of sketch, defaulting to the existing --- .../com/twitter/scalding/FileSource.scala | 8 +- .../twitter/scalding/typed/TypedPipe.scala | 88 +++++++++++++----- .../com/twitter/scalding/TypedPipeTest.scala | 92 +++++++++++++++++++ 3 files changed, 159 insertions(+), 29 deletions(-) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/FileSource.scala b/scalding-core/src/main/scala/com/twitter/scalding/FileSource.scala index 90be06d75a..93cd91d3ed 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/FileSource.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/FileSource.scala @@ -428,13 +428,13 @@ abstract class FixedPathSource(path: String*) extends FileSource { override def equals(that: Any): Boolean = (that != null) && (that.toString == toString) /** - * Similar in behavior to {@link TimePathedSource.writePathFor}. - * Strip out the trailing slash star. - */ + * Similar in behavior to {@link TimePathedSource.writePathFor}. + * Strip out the trailing slash star. + */ protected def stripTrailing(path: String): String = { assert(path != "*", "Path must not be *") assert(path != "/*", "Path must not be /*") - if(path.takeRight(2) == "/*") { + if (path.takeRight(2) == "/*") { path.dropRight(2) } else { path diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala index f4da0f1e52..d73090250b 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala @@ -26,7 +26,7 @@ import com.twitter.algebird.{ Aggregator, Monoid, Semigroup } import com.twitter.scalding.TupleConverter.{ TupleEntryConverter, singleConverter, tuple2Converter } import com.twitter.scalding.TupleSetter.{ singleSetter, tup2Setter } import com.twitter.scalding._ -import com.twitter.scalding.serialization.OrderedSerialization +import com.twitter.scalding.serialization.{ OrderedSerialization, Serialization } import com.twitter.scalding.serialization.OrderedSerialization.Result import com.twitter.scalding.serialization.macros.impl.BinaryOrdering import com.twitter.scalding.serialization.macros.impl.BinaryOrdering._ @@ -38,7 +38,7 @@ import scala.util.Try * This object is here rather than in the typed package because a lot of code was written using * the functions in the object, which we do not see how to hide with package object tricks. */ -object TypedPipe extends Serializable { +object TypedPipe extends Serializable with LowerPriorityTypedPipeImplicits { import Dsl.flowDefToRichFlowDef /** @@ -119,6 +119,67 @@ object TypedPipe extends Serializable { override def hash(x: Int): Int = x } } + implicit class HigherPriorityTypedPipeMethods[T, K, V](val tp: TypedPipe[T])(implicit ev: TypedPipe[T] <:< TypedPipe[(K, V)], + orderedSerialization: OrderedSerialization[K]) { + // scope the imports nice and local + import com.twitter.scalding.typed.Sketched + import com.twitter.scalding.serialization.{ OrderedSerialization, Serialization } + /** + * Enables joining when this TypedPipe has some keys with many many values and + * but many with very few values. For instance, a graph where some nodes have + * millions of neighbors, but most have only a few. + * + * We build a (count-min) sketch of each key's frequency, and we use that + * to shard the heavy keys across many reducers. + * This increases communication cost in order to reduce the maximum time needed + * to complete the join. + * + * {@code pipe.sketch(100).join(thatPipe) } + * will add an extra map/reduce job over a standard join to create the count-min-sketch. + * This will generally only be beneficial if you have really heavy skew, where without + * this you have 1 or 2 reducers taking hours longer than the rest. + */ + def sketch(reducers: Int, + eps: Double = 1.0E-5, //272k width = 1MB per row + delta: Double = 0.01, //5 rows (= 5 hashes) + seed: Int = 12345): Sketched[K, V] = + Sketched(ev(tp), reducers, delta, eps, seed)(Serialization.toBytes[K](_), orderedSerialization) + } + +} + +private[scalding] trait LowerPriorityTypedPipeImplicits { + implicit def toLowerPriorityTypedPipeMethods[T](tp: TypedPipe[T]) = + new LowerPriorityTypedPipeMethods(tp) +} +private[scalding] class LowerPriorityTypedPipeMethods[T](val tp: TypedPipe[T]) extends AnyVal { + // scope the imports nice and local + import com.twitter.scalding.typed.Sketched + import com.twitter.scalding.serialization.{ OrderedSerialization, Serialization } + + /** + * Enables joining when this TypedPipe has some keys with many many values and + * but many with very few values. For instance, a graph where some nodes have + * millions of neighbors, but most have only a few. + * + * We build a (count-min) sketch of each key's frequency, and we use that + * to shard the heavy keys across many reducers. + * This increases communication cost in order to reduce the maximum time needed + * to complete the join. + * + * {@code pipe.sketch(100).join(thatPipe) } + * will add an extra map/reduce job over a standard join to create the count-min-sketch. + * This will generally only be beneficial if you have really heavy skew, where without + * this you have 1 or 2 reducers taking hours longer than the rest. + */ + def sketch[K, V](reducers: Int, + eps: Double = 1.0E-5, //272k width = 1MB per row + delta: Double = 0.01, //5 rows (= 5 hashes) + seed: Int = 12345)(implicit ev: TypedPipe[T] <:< TypedPipe[(K, V)], + serialization: K => Array[Byte], + ordering: Ordering[K]): Sketched[K, V] = + Sketched(ev(tp), reducers, delta, eps, seed) + } /** @@ -690,29 +751,6 @@ trait TypedPipe[+T] extends Serializable { .hashLeftJoin(grouped) .map { case (t, (_, optV)) => (t, optV) } - /** - * Enables joining when this TypedPipe has some keys with many many values and - * but many with very few values. For instance, a graph where some nodes have - * millions of neighbors, but most have only a few. - * - * We build a (count-min) sketch of each key's frequency, and we use that - * to shard the heavy keys across many reducers. - * This increases communication cost in order to reduce the maximum time needed - * to complete the join. - * - * {@code pipe.sketch(100).join(thatPipe) } - * will add an extra map/reduce job over a standard join to create the count-min-sketch. - * This will generally only be beneficial if you have really heavy skew, where without - * this you have 1 or 2 reducers taking hours longer than the rest. - */ - def sketch[K, V](reducers: Int, - eps: Double = 1.0E-5, //272k width = 1MB per row - delta: Double = 0.01, //5 rows (= 5 hashes) - seed: Int = 12345)(implicit ev: TypedPipe[T] <:< TypedPipe[(K, V)], - serialization: K => Array[Byte], - ordering: Ordering[K]): Sketched[K, V] = - Sketched(ev(this), reducers, delta, eps, seed) - /** * If any errors happen below this line, but before a groupBy, write to a TypedSink */ diff --git a/scalding-core/src/test/scala/com/twitter/scalding/TypedPipeTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/TypedPipeTest.scala index 703bd29bdf..76d852bc93 100644 --- a/scalding-core/src/test/scala/com/twitter/scalding/TypedPipeTest.scala +++ b/scalding-core/src/test/scala/com/twitter/scalding/TypedPipeTest.scala @@ -1348,6 +1348,25 @@ class TypedSketchJoinJob(args: Args) extends Job(args) { .write(TypedText.tsv[(Int, Int, Int)]("output-join")) } +class OrderedSerializationTypedSketchJoinJob(args: Args) extends Job(args) { + val zero = TypedPipe.from(TypedText.tsv[(Int, Int)]("input0")) + val one = TypedPipe.from(TypedText.tsv[(Int, Int)]("input1")) + + import com.twitter.scalding.serialization.macros.impl.BinaryOrdering._ + + zero + .sketch(args("reducers").toInt) + .join(one) + .map{ case (k, (v0, v1)) => (k, v0, v1) } + .write(TypedText.tsv[(Int, Int, Int)]("output-sketch")) + + zero + .group + .join(one.group) + .map{ case (k, (v0, v1)) => (k, v0, v1) } + .write(TypedText.tsv[(Int, Int, Int)]("output-join")) +} + class TypedSketchLeftJoinJob(args: Args) extends Job(args) { val zero = TypedPipe.from(TypedText.tsv[(Int, Int)]("input0")) val one = TypedPipe.from(TypedText.tsv[(Int, Int)]("input1")) @@ -1367,6 +1386,25 @@ class TypedSketchLeftJoinJob(args: Args) extends Job(args) { .write(TypedText.tsv[(Int, Int, Int)]("output-join")) } +class OrderedSerializationTypedSketchLeftJoinJob(args: Args) extends Job(args) { + val zero = TypedPipe.from(TypedText.tsv[(Int, Int)]("input0")) + val one = TypedPipe.from(TypedText.tsv[(Int, Int)]("input1")) + + import com.twitter.scalding.serialization.macros.impl.BinaryOrdering._ + + zero + .sketch(args("reducers").toInt) + .leftJoin(one) + .map{ case (k, (v0, v1)) => (k, v0, v1.getOrElse(-1)) } + .write(TypedText.tsv[(Int, Int, Int)]("output-sketch")) + + zero + .group + .leftJoin(one.group) + .map{ case (k, (v0, v1)) => (k, v0, v1.getOrElse(-1)) } + .write(TypedText.tsv[(Int, Int, Int)]("output-join")) +} + object TypedSketchJoinTestHelper { import Dsl._ @@ -1428,6 +1466,33 @@ class TypedSketchJoinJobTest extends WordSpec with Matchers { sk shouldBe inner } } + + "A TypedSketchJoinJob using OrderedSerialization" should { + "get the same result as an inner join" in { + val (sk, inner) = runJobWithArguments(new OrderedSerializationTypedSketchJoinJob(_), 10, x => 1) + sk shouldBe inner + } + + "get the same result when half the left keys are missing" in { + val (sk, inner) = runJobWithArguments(new OrderedSerializationTypedSketchJoinJob(_), 10, x => if (x < 50) 0 else 1) + sk shouldBe inner + } + + "get the same result with a massive skew to one key" in { + val (sk, inner) = runJobWithArguments(new OrderedSerializationTypedSketchJoinJob(_), 10, x => if (x == 50) 1000 else 1) + sk shouldBe inner + } + + "still work with only one reducer" in { + val (sk, inner) = runJobWithArguments(new OrderedSerializationTypedSketchJoinJob(_), 1, x => 1) + sk shouldBe inner + } + + "still work with massive skew and only one reducer" in { + val (sk, inner) = runJobWithArguments(new OrderedSerializationTypedSketchJoinJob(_), 1, x => if (x == 50) 1000 else 1) + sk shouldBe inner + } + } } class TypedSketchLeftJoinJobTest extends WordSpec with Matchers { @@ -1460,4 +1525,31 @@ class TypedSketchLeftJoinJobTest extends WordSpec with Matchers { sk shouldBe inner } } + + "A OrderedSerialization TypedSketchLeftJoinJob" should { + "get the same result as a left join" in { + val (sk, left) = runJobWithArguments(new OrderedSerializationTypedSketchLeftJoinJob(_), 10, x => 1) + sk shouldBe left + } + + "get the same result when half the left keys are missing" in { + val (sk, inner) = runJobWithArguments(new OrderedSerializationTypedSketchLeftJoinJob(_), 10, x => if (x < 50) 0 else 1) + sk shouldBe inner + } + + "get the same result with a massive skew to one key" in { + val (sk, inner) = runJobWithArguments(new OrderedSerializationTypedSketchLeftJoinJob(_), 10, x => if (x == 50) 1000 else 1) + sk shouldBe inner + } + + "still work with only one reducer" in { + val (sk, inner) = runJobWithArguments(new OrderedSerializationTypedSketchLeftJoinJob(_), 1, x => 1) + sk shouldBe inner + } + + "still work with massive skew and only one reducer" in { + val (sk, inner) = runJobWithArguments(new OrderedSerializationTypedSketchLeftJoinJob(_), 1, x => if (x == 50) 1000 else 1) + sk shouldBe inner + } + } } From c23125aa33a904199b38e88173dcdeef47648e79 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Fri, 4 Mar 2016 16:59:08 -0800 Subject: [PATCH 2/4] Drop the T/ev --- .../main/scala/com/twitter/scalding/typed/TypedPipe.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala index d73090250b..bbeec0a4eb 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala @@ -119,8 +119,7 @@ object TypedPipe extends Serializable with LowerPriorityTypedPipeImplicits { override def hash(x: Int): Int = x } } - implicit class HigherPriorityTypedPipeMethods[T, K, V](val tp: TypedPipe[T])(implicit ev: TypedPipe[T] <:< TypedPipe[(K, V)], - orderedSerialization: OrderedSerialization[K]) { + implicit class HigherPriorityTypedPipeMethods[K, V](val tp: TypedPipe[(K, V)])(implicit orderedSerialization: OrderedSerialization[K]) { // scope the imports nice and local import com.twitter.scalding.typed.Sketched import com.twitter.scalding.serialization.{ OrderedSerialization, Serialization } @@ -143,7 +142,7 @@ object TypedPipe extends Serializable with LowerPriorityTypedPipeImplicits { eps: Double = 1.0E-5, //272k width = 1MB per row delta: Double = 0.01, //5 rows (= 5 hashes) seed: Int = 12345): Sketched[K, V] = - Sketched(ev(tp), reducers, delta, eps, seed)(Serialization.toBytes[K](_), orderedSerialization) + Sketched(tp, reducers, delta, eps, seed)(Serialization.toBytes[K](_), orderedSerialization) } } From 86e6b8310e83320c02ed7737f1d2c424586d49fc Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Fri, 4 Mar 2016 17:10:02 -0800 Subject: [PATCH 3/4] Simplify it down to anon classes gated on the right types --- .../twitter/scalding/typed/TypedPipe.scala | 63 ++++++------------- 1 file changed, 18 insertions(+), 45 deletions(-) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala index bbeec0a4eb..09f717dda8 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala @@ -38,7 +38,7 @@ import scala.util.Try * This object is here rather than in the typed package because a lot of code was written using * the functions in the object, which we do not see how to hide with package object tricks. */ -object TypedPipe extends Serializable with LowerPriorityTypedPipeImplicits { +object TypedPipe extends Serializable { import Dsl.flowDefToRichFlowDef /** @@ -119,43 +119,6 @@ object TypedPipe extends Serializable with LowerPriorityTypedPipeImplicits { override def hash(x: Int): Int = x } } - implicit class HigherPriorityTypedPipeMethods[K, V](val tp: TypedPipe[(K, V)])(implicit orderedSerialization: OrderedSerialization[K]) { - // scope the imports nice and local - import com.twitter.scalding.typed.Sketched - import com.twitter.scalding.serialization.{ OrderedSerialization, Serialization } - /** - * Enables joining when this TypedPipe has some keys with many many values and - * but many with very few values. For instance, a graph where some nodes have - * millions of neighbors, but most have only a few. - * - * We build a (count-min) sketch of each key's frequency, and we use that - * to shard the heavy keys across many reducers. - * This increases communication cost in order to reduce the maximum time needed - * to complete the join. - * - * {@code pipe.sketch(100).join(thatPipe) } - * will add an extra map/reduce job over a standard join to create the count-min-sketch. - * This will generally only be beneficial if you have really heavy skew, where without - * this you have 1 or 2 reducers taking hours longer than the rest. - */ - def sketch(reducers: Int, - eps: Double = 1.0E-5, //272k width = 1MB per row - delta: Double = 0.01, //5 rows (= 5 hashes) - seed: Int = 12345): Sketched[K, V] = - Sketched(tp, reducers, delta, eps, seed)(Serialization.toBytes[K](_), orderedSerialization) - } - -} - -private[scalding] trait LowerPriorityTypedPipeImplicits { - implicit def toLowerPriorityTypedPipeMethods[T](tp: TypedPipe[T]) = - new LowerPriorityTypedPipeMethods(tp) -} -private[scalding] class LowerPriorityTypedPipeMethods[T](val tp: TypedPipe[T]) extends AnyVal { - // scope the imports nice and local - import com.twitter.scalding.typed.Sketched - import com.twitter.scalding.serialization.{ OrderedSerialization, Serialization } - /** * Enables joining when this TypedPipe has some keys with many many values and * but many with very few values. For instance, a graph where some nodes have @@ -171,13 +134,23 @@ private[scalding] class LowerPriorityTypedPipeMethods[T](val tp: TypedPipe[T]) e * This will generally only be beneficial if you have really heavy skew, where without * this you have 1 or 2 reducers taking hours longer than the rest. */ - def sketch[K, V](reducers: Int, - eps: Double = 1.0E-5, //272k width = 1MB per row - delta: Double = 0.01, //5 rows (= 5 hashes) - seed: Int = 12345)(implicit ev: TypedPipe[T] <:< TypedPipe[(K, V)], - serialization: K => Array[Byte], - ordering: Ordering[K]): Sketched[K, V] = - Sketched(ev(tp), reducers, delta, eps, seed) + + implicit def toOrderedSerializionSketchMethod[K, V](tp: TypedPipe[(K, V)])(implicit orderedSerialization: OrderedSerialization[K]) = new { + def sketch(reducers: Int, + eps: Double = 1.0E-5, //272k width = 1MB per row + delta: Double = 0.01, //5 rows (= 5 hashes) + seed: Int = 12345): Sketched[K, V] = + Sketched(tp, reducers, delta, eps, seed)(Serialization.toBytes[K](_), orderedSerialization) + } + + implicit def toOrderingPlusFnSketchMethod[K, V](tp: TypedPipe[(K, V)])(implicit serialization: K => Array[Byte], + ordering: Ordering[K]) = new { + def sketch(reducers: Int, + eps: Double = 1.0E-5, //272k width = 1MB per row + delta: Double = 0.01, //5 rows (= 5 hashes) + seed: Int = 12345): Sketched[K, V] = + Sketched(tp, reducers, delta, eps, seed) + } } From fca47de494ce7dd3b8102db81f46b5ec23ed31e1 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Fri, 4 Mar 2016 17:16:09 -0800 Subject: [PATCH 4/4] Add priority back to avoid ambigious case --- .../twitter/scalding/typed/TypedPipe.scala | 12 ++++++----- .../com/twitter/scalding/TypedPipeTest.scala | 21 +++++++++++++++++++ 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala index 09f717dda8..e9797584f9 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala @@ -38,7 +38,7 @@ import scala.util.Try * This object is here rather than in the typed package because a lot of code was written using * the functions in the object, which we do not see how to hide with package object tricks. */ -object TypedPipe extends Serializable { +object TypedPipe extends Serializable with LowerPriorityTypedPipeEnrichements { import Dsl.flowDefToRichFlowDef /** @@ -143,15 +143,17 @@ object TypedPipe extends Serializable { Sketched(tp, reducers, delta, eps, seed)(Serialization.toBytes[K](_), orderedSerialization) } - implicit def toOrderingPlusFnSketchMethod[K, V](tp: TypedPipe[(K, V)])(implicit serialization: K => Array[Byte], - ordering: Ordering[K]) = new { +} + +trait LowerPriorityTypedPipeEnrichements { + implicit def toOrderingPlusFnSketchMethod[K, V](tp: TypedPipe[(K, V)]) = new { def sketch(reducers: Int, eps: Double = 1.0E-5, //272k width = 1MB per row delta: Double = 0.01, //5 rows (= 5 hashes) - seed: Int = 12345): Sketched[K, V] = + seed: Int = 12345)(implicit serialization: K => Array[Byte], + ordering: Ordering[K]): Sketched[K, V] = Sketched(tp, reducers, delta, eps, seed) } - } /** diff --git a/scalding-core/src/test/scala/com/twitter/scalding/TypedPipeTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/TypedPipeTest.scala index 76d852bc93..d7be9b06e8 100644 --- a/scalding-core/src/test/scala/com/twitter/scalding/TypedPipeTest.scala +++ b/scalding-core/src/test/scala/com/twitter/scalding/TypedPipeTest.scala @@ -1405,6 +1405,27 @@ class OrderedSerializationTypedSketchLeftJoinJob(args: Args) extends Job(args) { .write(TypedText.tsv[(Int, Int, Int)]("output-join")) } +class BothAvailableSketchMethodsTypedSketchLeftJoinJob(args: Args) extends Job(args) { + val zero = TypedPipe.from(TypedText.tsv[(Int, Int)]("input0")) + val one = TypedPipe.from(TypedText.tsv[(Int, Int)]("input1")) + + import com.twitter.scalding.serialization.macros.impl.BinaryOrdering._ + implicit def serialize(k: Int) = k.toString.getBytes + + zero + .sketch(args("reducers").toInt) + .leftJoin(one) + .map{ case (k, (v0, v1)) => (k, v0, v1.getOrElse(-1)) } + .write(TypedText.tsv[(Int, Int, Int)]("output-sketch")) + + zero + .group + .leftJoin(one.group) + .map{ case (k, (v0, v1)) => (k, v0, v1.getOrElse(-1)) } + .write(TypedText.tsv[(Int, Int, Int)]("output-join")) +} + + object TypedSketchJoinTestHelper { import Dsl._