diff --git a/scalding-core/src/main/scala/com/twitter/scalding/FileSource.scala b/scalding-core/src/main/scala/com/twitter/scalding/FileSource.scala index 90be06d75a..93cd91d3ed 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/FileSource.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/FileSource.scala @@ -428,13 +428,13 @@ abstract class FixedPathSource(path: String*) extends FileSource { override def equals(that: Any): Boolean = (that != null) && (that.toString == toString) /** - * Similar in behavior to {@link TimePathedSource.writePathFor}. - * Strip out the trailing slash star. - */ + * Similar in behavior to {@link TimePathedSource.writePathFor}. + * Strip out the trailing slash star. + */ protected def stripTrailing(path: String): String = { assert(path != "*", "Path must not be *") assert(path != "/*", "Path must not be /*") - if(path.takeRight(2) == "/*") { + if (path.takeRight(2) == "/*") { path.dropRight(2) } else { path diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala index f4da0f1e52..e9797584f9 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala @@ -26,7 +26,7 @@ import com.twitter.algebird.{ Aggregator, Monoid, Semigroup } import com.twitter.scalding.TupleConverter.{ TupleEntryConverter, singleConverter, tuple2Converter } import com.twitter.scalding.TupleSetter.{ singleSetter, tup2Setter } import com.twitter.scalding._ -import com.twitter.scalding.serialization.OrderedSerialization +import com.twitter.scalding.serialization.{ OrderedSerialization, Serialization } import com.twitter.scalding.serialization.OrderedSerialization.Result import com.twitter.scalding.serialization.macros.impl.BinaryOrdering import com.twitter.scalding.serialization.macros.impl.BinaryOrdering._ @@ -38,7 +38,7 @@ import scala.util.Try * This object is here rather than in the typed package because a lot of code was written using * the functions in the object, which we do not see how to hide with package object tricks. */ -object TypedPipe extends Serializable { +object TypedPipe extends Serializable with LowerPriorityTypedPipeEnrichements { import Dsl.flowDefToRichFlowDef /** @@ -119,6 +119,41 @@ object TypedPipe extends Serializable { override def hash(x: Int): Int = x } } + /** + * Enables joining when this TypedPipe has some keys with many many values and + * but many with very few values. For instance, a graph where some nodes have + * millions of neighbors, but most have only a few. + * + * We build a (count-min) sketch of each key's frequency, and we use that + * to shard the heavy keys across many reducers. + * This increases communication cost in order to reduce the maximum time needed + * to complete the join. + * + * {@code pipe.sketch(100).join(thatPipe) } + * will add an extra map/reduce job over a standard join to create the count-min-sketch. + * This will generally only be beneficial if you have really heavy skew, where without + * this you have 1 or 2 reducers taking hours longer than the rest. + */ + + implicit def toOrderedSerializionSketchMethod[K, V](tp: TypedPipe[(K, V)])(implicit orderedSerialization: OrderedSerialization[K]) = new { + def sketch(reducers: Int, + eps: Double = 1.0E-5, //272k width = 1MB per row + delta: Double = 0.01, //5 rows (= 5 hashes) + seed: Int = 12345): Sketched[K, V] = + Sketched(tp, reducers, delta, eps, seed)(Serialization.toBytes[K](_), orderedSerialization) + } + +} + +trait LowerPriorityTypedPipeEnrichements { + implicit def toOrderingPlusFnSketchMethod[K, V](tp: TypedPipe[(K, V)]) = new { + def sketch(reducers: Int, + eps: Double = 1.0E-5, //272k width = 1MB per row + delta: Double = 0.01, //5 rows (= 5 hashes) + seed: Int = 12345)(implicit serialization: K => Array[Byte], + ordering: Ordering[K]): Sketched[K, V] = + Sketched(tp, reducers, delta, eps, seed) + } } /** @@ -690,29 +725,6 @@ trait TypedPipe[+T] extends Serializable { .hashLeftJoin(grouped) .map { case (t, (_, optV)) => (t, optV) } - /** - * Enables joining when this TypedPipe has some keys with many many values and - * but many with very few values. For instance, a graph where some nodes have - * millions of neighbors, but most have only a few. - * - * We build a (count-min) sketch of each key's frequency, and we use that - * to shard the heavy keys across many reducers. - * This increases communication cost in order to reduce the maximum time needed - * to complete the join. - * - * {@code pipe.sketch(100).join(thatPipe) } - * will add an extra map/reduce job over a standard join to create the count-min-sketch. - * This will generally only be beneficial if you have really heavy skew, where without - * this you have 1 or 2 reducers taking hours longer than the rest. - */ - def sketch[K, V](reducers: Int, - eps: Double = 1.0E-5, //272k width = 1MB per row - delta: Double = 0.01, //5 rows (= 5 hashes) - seed: Int = 12345)(implicit ev: TypedPipe[T] <:< TypedPipe[(K, V)], - serialization: K => Array[Byte], - ordering: Ordering[K]): Sketched[K, V] = - Sketched(ev(this), reducers, delta, eps, seed) - /** * If any errors happen below this line, but before a groupBy, write to a TypedSink */ diff --git a/scalding-core/src/test/scala/com/twitter/scalding/TypedPipeTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/TypedPipeTest.scala index 703bd29bdf..d7be9b06e8 100644 --- a/scalding-core/src/test/scala/com/twitter/scalding/TypedPipeTest.scala +++ b/scalding-core/src/test/scala/com/twitter/scalding/TypedPipeTest.scala @@ -1348,6 +1348,25 @@ class TypedSketchJoinJob(args: Args) extends Job(args) { .write(TypedText.tsv[(Int, Int, Int)]("output-join")) } +class OrderedSerializationTypedSketchJoinJob(args: Args) extends Job(args) { + val zero = TypedPipe.from(TypedText.tsv[(Int, Int)]("input0")) + val one = TypedPipe.from(TypedText.tsv[(Int, Int)]("input1")) + + import com.twitter.scalding.serialization.macros.impl.BinaryOrdering._ + + zero + .sketch(args("reducers").toInt) + .join(one) + .map{ case (k, (v0, v1)) => (k, v0, v1) } + .write(TypedText.tsv[(Int, Int, Int)]("output-sketch")) + + zero + .group + .join(one.group) + .map{ case (k, (v0, v1)) => (k, v0, v1) } + .write(TypedText.tsv[(Int, Int, Int)]("output-join")) +} + class TypedSketchLeftJoinJob(args: Args) extends Job(args) { val zero = TypedPipe.from(TypedText.tsv[(Int, Int)]("input0")) val one = TypedPipe.from(TypedText.tsv[(Int, Int)]("input1")) @@ -1367,6 +1386,46 @@ class TypedSketchLeftJoinJob(args: Args) extends Job(args) { .write(TypedText.tsv[(Int, Int, Int)]("output-join")) } +class OrderedSerializationTypedSketchLeftJoinJob(args: Args) extends Job(args) { + val zero = TypedPipe.from(TypedText.tsv[(Int, Int)]("input0")) + val one = TypedPipe.from(TypedText.tsv[(Int, Int)]("input1")) + + import com.twitter.scalding.serialization.macros.impl.BinaryOrdering._ + + zero + .sketch(args("reducers").toInt) + .leftJoin(one) + .map{ case (k, (v0, v1)) => (k, v0, v1.getOrElse(-1)) } + .write(TypedText.tsv[(Int, Int, Int)]("output-sketch")) + + zero + .group + .leftJoin(one.group) + .map{ case (k, (v0, v1)) => (k, v0, v1.getOrElse(-1)) } + .write(TypedText.tsv[(Int, Int, Int)]("output-join")) +} + +class BothAvailableSketchMethodsTypedSketchLeftJoinJob(args: Args) extends Job(args) { + val zero = TypedPipe.from(TypedText.tsv[(Int, Int)]("input0")) + val one = TypedPipe.from(TypedText.tsv[(Int, Int)]("input1")) + + import com.twitter.scalding.serialization.macros.impl.BinaryOrdering._ + implicit def serialize(k: Int) = k.toString.getBytes + + zero + .sketch(args("reducers").toInt) + .leftJoin(one) + .map{ case (k, (v0, v1)) => (k, v0, v1.getOrElse(-1)) } + .write(TypedText.tsv[(Int, Int, Int)]("output-sketch")) + + zero + .group + .leftJoin(one.group) + .map{ case (k, (v0, v1)) => (k, v0, v1.getOrElse(-1)) } + .write(TypedText.tsv[(Int, Int, Int)]("output-join")) +} + + object TypedSketchJoinTestHelper { import Dsl._ @@ -1428,6 +1487,33 @@ class TypedSketchJoinJobTest extends WordSpec with Matchers { sk shouldBe inner } } + + "A TypedSketchJoinJob using OrderedSerialization" should { + "get the same result as an inner join" in { + val (sk, inner) = runJobWithArguments(new OrderedSerializationTypedSketchJoinJob(_), 10, x => 1) + sk shouldBe inner + } + + "get the same result when half the left keys are missing" in { + val (sk, inner) = runJobWithArguments(new OrderedSerializationTypedSketchJoinJob(_), 10, x => if (x < 50) 0 else 1) + sk shouldBe inner + } + + "get the same result with a massive skew to one key" in { + val (sk, inner) = runJobWithArguments(new OrderedSerializationTypedSketchJoinJob(_), 10, x => if (x == 50) 1000 else 1) + sk shouldBe inner + } + + "still work with only one reducer" in { + val (sk, inner) = runJobWithArguments(new OrderedSerializationTypedSketchJoinJob(_), 1, x => 1) + sk shouldBe inner + } + + "still work with massive skew and only one reducer" in { + val (sk, inner) = runJobWithArguments(new OrderedSerializationTypedSketchJoinJob(_), 1, x => if (x == 50) 1000 else 1) + sk shouldBe inner + } + } } class TypedSketchLeftJoinJobTest extends WordSpec with Matchers { @@ -1460,4 +1546,31 @@ class TypedSketchLeftJoinJobTest extends WordSpec with Matchers { sk shouldBe inner } } + + "A OrderedSerialization TypedSketchLeftJoinJob" should { + "get the same result as a left join" in { + val (sk, left) = runJobWithArguments(new OrderedSerializationTypedSketchLeftJoinJob(_), 10, x => 1) + sk shouldBe left + } + + "get the same result when half the left keys are missing" in { + val (sk, inner) = runJobWithArguments(new OrderedSerializationTypedSketchLeftJoinJob(_), 10, x => if (x < 50) 0 else 1) + sk shouldBe inner + } + + "get the same result with a massive skew to one key" in { + val (sk, inner) = runJobWithArguments(new OrderedSerializationTypedSketchLeftJoinJob(_), 10, x => if (x == 50) 1000 else 1) + sk shouldBe inner + } + + "still work with only one reducer" in { + val (sk, inner) = runJobWithArguments(new OrderedSerializationTypedSketchLeftJoinJob(_), 1, x => 1) + sk shouldBe inner + } + + "still work with massive skew and only one reducer" in { + val (sk, inner) = runJobWithArguments(new OrderedSerializationTypedSketchLeftJoinJob(_), 1, x => if (x == 50) 1000 else 1) + sk shouldBe inner + } + } }