diff --git a/project/Build.scala b/project/Build.scala index d3e4a80a88..7a2fef4fd5 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -39,6 +39,7 @@ object ScaldingBuild extends Build { val scalaTestVersion = "2.2.2" val scalameterVersion = "0.6" val scroogeVersion = "3.17.0" + val shapelessVersion = "2.1.0" val slf4jVersion = "1.6.6" val thriftVersion = "0.5.0" @@ -269,6 +270,16 @@ object ScaldingBuild extends Build { "org.apache.hadoop" % "hadoop-core" % hadoopVersion % "provided", "org.slf4j" % "slf4j-api" % slf4jVersion, "org.slf4j" % "slf4j-log4j12" % slf4jVersion % "provided" + ) ++ ( + if(isScala210x(scalaVersion.value)) { + Seq( + "com.chuusai" % "shapeless_2.10.4" % shapelessVersion + ) + } else { + Seq( + "com.chuusai" %% "shapeless" % shapelessVersion + ) + } ) ).dependsOn(scaldingArgs, scaldingDate, maple) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/TupleConverter.scala b/scalding-core/src/main/scala/com/twitter/scalding/TupleConverter.scala index 0a0f2c12d9..ad2b92f8a8 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/TupleConverter.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/TupleConverter.scala @@ -18,8 +18,10 @@ package com.twitter.scalding import cascading.tuple.TupleEntry import cascading.tuple.{ Tuple => CTuple } +import shapeless._ +import shapeless.ops.nat._ -import scala.collection.breakOut +import scala.collection.{GenTraversable, breakOut} /** * Typeclass to represent converting from cascading TupleEntry to some type T. @@ -51,6 +53,37 @@ trait LowPriorityTupleConverters extends java.io.Serializable { } } +/** + * Based on `FromTraversable` from shapeless + */ +trait FromTupleEntry[Out <: HList, I <: Nat] { + def apply(l : TupleEntry) : Option[Out] +} + +object FromTupleEntry { + def apply[Out <: HList](implicit from: FromTupleEntry[Out, Nat._0]) = from + + implicit def hnilFromTupleEntry[T, I <: Nat](implicit toIntI : ToInt[I]) = + new FromTupleEntry[HNil, I] { + def apply(te : TupleEntry) = + if(te.size() == toIntI()) Some(HNil) else None + } + + implicit def hlistFromTupleEntry[OutH, OutT <: HList, I <: Nat] + (implicit + flt : FromTupleEntry[OutT, Succ[I]], + g : TupleGetter[OutH], + toIntI : ToInt[I]) = + new FromTupleEntry[OutH :: OutT, I] { + def apply(te : TupleEntry) : Option[OutH :: OutT] = + if(te.size() <= toIntI()) None + else for( + h <- new Some(g.get(te.getTuple, toIntI())); + t <- flt(te) + ) yield h :: t + } +} + object TupleConverter extends GeneratedTupleConverters { /** * Treat this TupleConverter as one for a superclass @@ -67,6 +100,23 @@ object TupleConverter extends GeneratedTupleConverters { def arity[T](implicit tc: TupleConverter[T]): Int = tc.arity def of[T](implicit tc: TupleConverter[T]): TupleConverter[T] = tc + import shapeless.ops.hlist._ + + implicit def hListConverter[H, T <: HList, N <: Nat] + (implicit + g: TupleGetter[H], + len: Length.Aux[H :: T, N], + toIntN : ToInt[N], + fl : FromTupleEntry[H :: T, Nat._0]): TupleConverter[H :: T] = + new TupleConverter[H :: T] { + override def apply(te: TupleEntry): H :: T = { + val l : Option[H :: T] = fl(te) + l.get + } + + override def arity: Int = toIntN() + } + /** * Copies the tupleEntry, since cascading may change it after the end of an * operation (and it is not safe to assume the consumer has not kept a ref diff --git a/scalding-core/src/main/scala/com/twitter/scalding/TupleSetter.scala b/scalding-core/src/main/scala/com/twitter/scalding/TupleSetter.scala index f822b6b271..1329b02a5c 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/TupleSetter.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/TupleSetter.scala @@ -65,6 +65,21 @@ object TupleSetter extends GeneratedTupleSetters { def arity[T](implicit ts: TupleSetter[T]): Int = ts.arity def of[T](implicit ts: TupleSetter[T]): TupleSetter[T] = ts + import shapeless.ops.hlist._ + import shapeless.ops.nat._ + import shapeless._ + + implicit def hListSetter[L <: HList, N <: Nat, T <: Any] + (implicit len: Length.Aux[L, N], ti: ToInt[N], tl: ToList[L, T]) = new TupleSetter[L] { + + override def apply(arg: L): CTuple = { + val list = arg.toList[T].map(_.asInstanceOf[Object]) + new CTuple(list:_*) + } + + override def arity: Int = ti() + } + //This is here for handling functions that return cascading tuples: implicit lazy val CTupleSetter: TupleSetter[CTuple] = new TupleSetter[CTuple] { override def apply(arg: CTuple) = new CTuple(arg) diff --git a/scalding-core/src/test/scala/com/twitter/scalding/TupleTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/TupleTest.scala index 7d3b2f2ac1..b9af9d35b0 100644 --- a/scalding-core/src/test/scala/com/twitter/scalding/TupleTest.scala +++ b/scalding-core/src/test/scala/com/twitter/scalding/TupleTest.scala @@ -18,6 +18,7 @@ package com.twitter.scalding import cascading.tuple.{ TupleEntry, Tuple => CTuple } import org.scalatest.{ Matchers, WordSpec } +import shapeless._ class TupleTest extends WordSpec with Matchers { def get[T](ctup: CTuple)(implicit tc: TupleConverter[T]) = tc(new TupleEntry(ctup)) @@ -62,6 +63,23 @@ class TupleTest extends WordSpec with Matchers { arityConvMatches((2, 3), 2) shouldBe true aritySetMatches((2, 3), 2) shouldBe true } + "get primitives out of cascading tuples using HLists" in { + val ctup = new CTuple("hey", new java.lang.Long(2), new java.lang.Integer(3)) + get[String :: Long :: Int :: HNil](ctup) shouldBe "hey" :: 2L :: 3 :: HNil + + roundTrip[Int :: HNil](3 :: HNil) shouldBe true + arityConvMatches(3 :: HNil, 1) shouldBe true + aritySetMatches(3 :: HNil, 1) shouldBe true + roundTrip[Long :: HNil](42L :: HNil) shouldBe true + arityConvMatches(42L :: HNil, 1) shouldBe true + aritySetMatches(42L :: HNil, 1) shouldBe true + roundTrip[String :: HNil]("hey" :: HNil) shouldBe true + arityConvMatches("hey" :: HNil, 1) shouldBe true + aritySetMatches("hey" :: HNil, 1) shouldBe true + roundTrip[Int :: Int :: HNil](4 :: 2 :: HNil) shouldBe true + arityConvMatches(2 :: 3 :: HNil, 2) shouldBe true + aritySetMatches(2 :: 3 :: HNil, 2) shouldBe true + } "get non-primitives out of cascading tuples" in { val ctup = new CTuple(None, List(1, 2, 3), 1 -> 2) get[(Option[Int], List[Int], (Int, Int))](ctup) shouldBe (None, List(1, 2, 3), 1 -> 2) @@ -75,6 +93,19 @@ class TupleTest extends WordSpec with Matchers { arityConvMatches(List(1, 2, 3), 1) shouldBe true aritySetMatches(List(1, 2, 3), 1) shouldBe true } + "get non-primitives out of cascading tuples using HLists" in { + val ctup = new CTuple(None, List(1, 2, 3), 1 -> 2) + get[Option[Int] :: List[Int] :: (Int, Int) :: HNil](ctup) shouldBe None :: List(1, 2, 3) :: 1 -> 2 :: HNil + + roundTrip[Option[Int] :: List[Int] :: HNil](Some(1) :: List() :: HNil) shouldBe true + arityConvMatches(None :: Nil :: HNil, 2) shouldBe true + aritySetMatches(None :: Nil :: HNil, 2) shouldBe true + + arityConvMatches(None :: HNil, 1) shouldBe true + aritySetMatches(None :: HNil, 1) shouldBe true + arityConvMatches(List(1, 2, 3) :: HNil, 1) shouldBe true + aritySetMatches(List(1, 2, 3) :: HNil, 1) shouldBe true + } "deal with AnyRef" in { val ctup = new CTuple(None, List(1, 2, 3), 1 -> 2) get[(AnyRef, AnyRef, AnyRef)](ctup) shouldBe (None, List(1, 2, 3), 1 -> 2) @@ -85,5 +116,15 @@ class TupleTest extends WordSpec with Matchers { arityConvMatches[(AnyRef, AnyRef)](("hey", "you"), 2) shouldBe true aritySetMatches[(AnyRef, AnyRef)](("hey", "you"), 2) shouldBe true } + "deal with AnyRef using HLists" in { + val ctup = new CTuple(None, List(1, 2, 3), 1 -> 2) + get[AnyRef :: AnyRef :: AnyRef :: HNil](ctup) shouldBe None :: List(1, 2, 3) :: 1 -> 2 :: HNil + get[AnyRef :: HNil](new CTuple("you")) shouldBe "you" :: HNil + + roundTrip[AnyRef :: HNil]("hey" :: HNil) shouldBe true + roundTrip[AnyRef :: AnyRef :: HNil](Nil :: Nil :: HNil) shouldBe true + arityConvMatches[AnyRef :: AnyRef :: HNil]("hey" :: "you" :: HNil, 2) shouldBe true + aritySetMatches[AnyRef :: AnyRef :: HNil]("hey" :: "you" :: HNil, 2) shouldBe true + } } }