diff --git a/scalding-core/src/main/scala/com/twitter/scalding/FieldConversions.scala b/scalding-core/src/main/scala/com/twitter/scalding/FieldConversions.scala index bcc02e4ce1..d6426266c0 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/FieldConversions.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/FieldConversions.scala @@ -23,7 +23,32 @@ import java.util.Comparator import scala.annotation.tailrec import scala.collection.JavaConverters._ +/** + * These are particularly unsafe and we don't recommend using them + */ +trait DeprecatedFieldConversions extends LowPriorityFieldConversions { + implicit def travOnceToFields[T](t: TraversableOnce[Any]): Fields = + parseAnySeqToFields(t) +} + +object DeprecatedFieldConversions extends DeprecatedFieldConversions + trait LowPriorityFieldConversions { + /** + * Useful to convert f : Any* to Fields. This handles mixed cases ("hey", 'you). + * Not sure we should be this flexible, but given that Cascading will throw an + * exception before scheduling the job, I guess this is okay. + */ + def parseAnySeqToFields[T <: TraversableOnce[Any]](anyf: T): Fields = { + val fields = new Fields(anyf.toSeq.map { anyToFieldArg }: _*) + anyf.foreach { + _ match { + case field: Field[_] => fields.setComparator(field.id, field.ord) + case _ => + } + } + fields + } protected def anyToFieldArg(f: Any): Comparable[_] = f match { case x: Symbol => x.name @@ -49,13 +74,18 @@ trait LowPriorityFieldConversions { * Lists are handled by an implicit in FieldConversions, which have * higher priority. */ - implicit def productToFields(f: Product): Fields = { - val fields = new Fields(f.productIterator.map { anyToFieldArg }.toSeq: _*) - f.productIterator.foreach { - case field: Field[_] => fields.setComparator(field.id, field.ord) - case _ => - } - fields + implicit def productToFields(f: Product): Fields = f match { + case l: List[_] => + // List is a common product, unfortunately this is a very dangerous + // implicit since so many things extends product... + parseAnySeqToFields(l) + case _ => + val fields = new Fields(f.productIterator.map(anyToFieldArg).toSeq: _*) + f.productIterator.foreach { + case field: Field[_] => fields.setComparator(field.id, field.ord) + case _ => () + } + fields } } @@ -169,25 +199,12 @@ trait FieldConversions extends LowPriorityFieldConversions { implicit def fromEnum[T <: Enumeration](enumeration: T): Fields = new Fields(enumeration.values.toList.map { _.toString }: _*) - implicit def fields[T <: TraversableOnce[Symbol]](f: T): Fields = new Fields(f.toSeq.map(_.name): _*) - implicit def strFields[T <: TraversableOnce[String]](f: T): Fields = new Fields(f.toSeq: _*) - implicit def intFields[T <: TraversableOnce[Int]](f: T): Fields = { - new Fields(f.toSeq.map { new java.lang.Integer(_) }: _*) - } - implicit def fieldFields[T <: TraversableOnce[Field[_]]](f: T): RichFields = RichFields(f.toSeq) - /** - * Useful to convert f : Any* to Fields. This handles mixed cases ("hey", 'you). - * Not sure we should be this flexible, but given that Cascading will throw an - * exception before scheduling the job, I guess this is okay. - */ - implicit def parseAnySeqToFields[T <: TraversableOnce[Any]](anyf: T): Fields = { - val fields = new Fields(anyf.toSeq.map { anyToFieldArg }: _*) - anyf.foreach { - case field: Field[_] => fields.setComparator(field.id, field.ord) - case _ => - } - fields - } + def fields[T <: TraversableOnce[Symbol]](f: T): Fields = new Fields(f.toSeq.map(_.name): _*) + def strFields[T <: TraversableOnce[String]](f: T): Fields = new Fields(f.toSeq: _*) + def intFields[T <: TraversableOnce[Int]](f: T): Fields = + new Fields(f.toSeq.map(Integer.valueOf): _*) + + implicit def fieldFields[T <: TraversableOnce[Field[_]]](f: T): Fields = RichFields(f.toSeq) //Handle a pair generally: implicit def tuple2ToFieldsPair[T, U](pair: (T, U))(implicit tf: T => Fields, uf: U => Fields): (Fields, Fields) = { @@ -199,7 +216,7 @@ trait FieldConversions extends LowPriorityFieldConversions { * We can't set the field Manifests because cascading doesn't (yet) expose field type information * in the Fields API. */ - implicit def fieldsToRichFields(fields: Fields): RichFields = { + implicit def fieldsToRichFields(fields: Fields): RichFields = if (!fields.isDefined) { // TODO We could provide a reasonable conversion here by designing a rich type hierarchy such as // Fields @@ -211,20 +228,21 @@ trait FieldConversions extends LowPriorityFieldConversions { // of the appropriate type sys.error("virtual Fields cannot be converted to RichFields") } + else { - // This bit is kludgy because cascading provides different interfaces for extracting - // IDs and Comparators from a Fields instance. (The IDs are only available - // "one at a time" by querying for a specific index, while the Comparators are only - // available "all at once" by calling getComparators.) + // This bit is kludgy because cascading provides different interfaces for extracting + // IDs and Comparators from a Fields instance. (The IDs are only available + // "one at a time" by querying for a specific index, while the Comparators are only + // available "all at once" by calling getComparators.) - new RichFields(asList(fields).zip(fields.getComparators).map { - case (id: Comparable[_], comparator: Comparator[_]) => id match { - case x: java.lang.Integer => IntField(x)(Ordering.comparatorToOrdering(comparator), None) - case y: String => StringField(y)(Ordering.comparatorToOrdering(comparator), None) - case z => sys.error("not expecting object of type " + z.getClass + " as field name") - } - }) - } + new RichFields(asList(fields).zip(fields.getComparators).map { + case (id: Comparable[_], comparator: Comparator[_]) => id match { + case x: java.lang.Integer => IntField(x)(Ordering.comparatorToOrdering(comparator), None) + case y: String => StringField(y)(Ordering.comparatorToOrdering(comparator), None) + case z => sys.error("not expecting object of type " + z.getClass + " as field name") + } + }) + } } @@ -234,7 +252,7 @@ trait FieldConversions extends LowPriorityFieldConversions { // val myFields: Fields = ... // myFields.toFieldList -case class RichFields(val toFieldList: List[Field[_]]) extends Fields(toFieldList.map { _.id }: _*) { +case class RichFields(toFieldList: List[Field[_]]) extends Fields(toFieldList.map { _.id }: _*) { toFieldList.foreach { field: Field[_] => setComparator(field.id, field.ord) } } diff --git a/scalding-core/src/main/scala/com/twitter/scalding/ReduceOperations.scala b/scalding-core/src/main/scala/com/twitter/scalding/ReduceOperations.scala index 7645cac59c..ab990b1b1a 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/ReduceOperations.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/ReduceOperations.scala @@ -193,13 +193,13 @@ trait ReduceOperations[+Self <: ReduceOperations[Self]] extends java.io.Serializ //CTuple's have unknown arity so we have to put them into a Tuple1 in the middle phase: mapReduceMap(fd) { ctuple: CTuple => Tuple1(ctuple) } { (oldVal, newVal) => oldVal } { result => result._1 } } - def head(f: Symbol*): Self = head(f -> f) + def head(f: Symbol*): Self = head(fields(f) -> fields(f)) def last(fd: (Fields, Fields)) = { //CTuple's have unknown arity so we have to put them into a Tuple1 in the middle phase: mapReduceMap(fd) { ctuple: CTuple => Tuple1(ctuple) } { (oldVal, newVal) => newVal } { result => result._1 } } - def last(f: Symbol*): Self = last(f -> f) + def last(f: Symbol*): Self = last(fields(f) -> fields(f)) /** * Collect all the values into a List[T] and then operate on that @@ -239,9 +239,9 @@ trait ReduceOperations[+Self <: ReduceOperations[Self]] extends java.io.Serializ mapReduceMap(fieldDef) { ctuple: CTuple => Tuple1(ctuple) } { (oldVal, newVal) => if (select(oldVal._1, newVal._1)) oldVal else newVal } { result => result._1 } } def max(fieldDef: (Fields, Fields)) = extremum(true, fieldDef) - def max(f: Symbol*) = extremum(true, (f -> f)) + def max(f: Symbol*) = extremum(true, (fields(f) -> fields(f))) def min(fieldDef: (Fields, Fields)) = extremum(false, fieldDef) - def min(f: Symbol*) = extremum(false, (f -> f)) + def min(f: Symbol*) = extremum(false, (fields(f) -> fields(f))) /** * Similar to the scala.collection.Iterable.mkString @@ -288,7 +288,7 @@ trait ReduceOperations[+Self <: ReduceOperations[Self]] extends java.io.Serializ //Same as reduce(f->f) def reduce[T](fieldDef: Symbol*)(fn: (T, T) => T)(implicit setter: TupleSetter[T], conv: TupleConverter[T]): Self = { - reduce(fieldDef -> fieldDef)(fn)(setter, conv) + reduce(fields(fieldDef) -> fields(fieldDef))(fn)(setter, conv) } // Abstract algebra reductions (sum, times, dot): @@ -309,7 +309,7 @@ trait ReduceOperations[+Self <: ReduceOperations[Self]] extends java.io.Serializ * Assumed to be a commutative operation. If you don't want that, use .forceToReducers */ def sum[T](fs: Symbol*)(implicit sg: Semigroup[T], tconv: TupleConverter[T], tset: TupleSetter[T]): Self = - sum[T](fs -> fs)(sg, tconv, tset) + sum[T](fields(fs) -> fields(fs))(sg, tconv, tset) /** * Returns the product of all the items in this grouping @@ -324,7 +324,7 @@ trait ReduceOperations[+Self <: ReduceOperations[Self]] extends java.io.Serializ * The same as `times(fs -> fs)` */ def times[T](fs: Symbol*)(implicit ring: Ring[T], tconv: TupleConverter[T], tset: TupleSetter[T]): Self = { - times[T](fs -> fs)(ring, tconv, tset) + times[T](fields(fs) -> fields(fs))(ring, tconv, tset) } /** diff --git a/scalding-core/src/main/scala/com/twitter/scalding/mathematics/Combinatorics.scala b/scalding-core/src/main/scala/com/twitter/scalding/mathematics/Combinatorics.scala index 2182a7c2f6..0d6d4d8b61 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/mathematics/Combinatorics.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/mathematics/Combinatorics.scala @@ -202,7 +202,7 @@ object Combinatorics { }.filter('temp){ x: Double => if (allc.size == numWeights) (math.abs(x - result) <= error) else (x <= result) }.discard('temp), allc) - })._1.unique(allColumns) + })._1.unique(fields(allColumns)) (1 to numWeights).zip(weights).foldLeft(res) ((a, b) => { val (num, wt) = b @@ -218,7 +218,7 @@ object Combinatorics { def positiveWeightedSum(weights: IndexedSeq[Double], result: Double, error: Double)(implicit flowDef: FlowDef, mode: Mode): Pipe = { val allColumns = (1 to weights.size).map(x => Symbol("k" + x)) weightedSum(weights, result, error) - .filter(allColumns) { x: TupleEntry => + .filter(fields(allColumns)) { x: TupleEntry => (0 until allColumns.size).forall { i => x.getDouble(java.lang.Integer.valueOf(i)) != 0.0 } } } diff --git a/scalding-core/src/test/scala/com/twitter/scalding/FieldImpsTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/FieldImpsTest.scala index ba772559ed..3e03462836 100644 --- a/scalding-core/src/test/scala/com/twitter/scalding/FieldImpsTest.scala +++ b/scalding-core/src/test/scala/com/twitter/scalding/FieldImpsTest.scala @@ -19,6 +19,8 @@ import cascading.tuple.Fields import org.scalatest.{ Matchers, WordSpec } +import DeprecatedFieldConversions._ + class FieldImpsTest extends WordSpec with Matchers with FieldConversions { def setAndCheck[T <: Comparable[_]](v: T)(implicit conv: (T) => Fields): Unit = { conv(v) shouldBe (new Fields(v))