From 27e21fb71de80428b0a927920b0d012cbbcd7c98 Mon Sep 17 00:00:00 2001 From: Oscar Boykin Date: Tue, 28 Mar 2017 14:40:35 -1000 Subject: [PATCH 1/2] Remove several implicits for TraversableOnce to Fields --- .../twitter/scalding/FieldConversions.scala | 88 ++++++++++--------- .../twitter/scalding/ReduceOperations.scala | 14 +-- .../scalding/mathematics/Combinatorics.scala | 4 +- .../com/twitter/scalding/FieldImpsTest.scala | 15 ++-- 4 files changed, 61 insertions(+), 60 deletions(-) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/FieldConversions.scala b/scalding-core/src/main/scala/com/twitter/scalding/FieldConversions.scala index 41a72d566d..be2c9228c6 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/FieldConversions.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/FieldConversions.scala @@ -24,6 +24,21 @@ import scala.annotation.tailrec import scala.collection.JavaConverters._ trait LowPriorityFieldConversions { + /** + * Useful to convert f : Any* to Fields. This handles mixed cases ("hey", 'you). + * Not sure we should be this flexible, but given that Cascading will throw an + * exception before scheduling the job, I guess this is okay. + */ + def parseAnySeqToFields[T <: TraversableOnce[Any]](anyf: T) = { + val fields = new Fields(anyf.toSeq.map { anyToFieldArg }: _*) + anyf.foreach { + _ match { + case field: Field[_] => fields.setComparator(field.id, field.ord) + case _ => + } + } + fields + } protected def anyToFieldArg(f: Any): Comparable[_] = f match { case x: Symbol => x.name @@ -49,15 +64,18 @@ trait LowPriorityFieldConversions { * Lists are handled by an implicit in FieldConversions, which have * higher priority. */ - implicit def productToFields(f: Product) = { - val fields = new Fields(f.productIterator.map { anyToFieldArg }.toSeq: _*) - f.productIterator.foreach { - _ match { + implicit def productToFields(f: Product): Fields = f match { + case l: List[_] => + // List is a common product, unfortunately this is a very dangerous + // implicit since so many things extends product... + parseAnySeqToFields(l) + case _ => + val fields = new Fields(f.productIterator.map(anyToFieldArg).toSeq: _*) + f.productIterator.foreach { case field: Field[_] => fields.setComparator(field.id, field.ord) - case _ => + case _ => () } - } - fields + fields } } @@ -112,7 +130,7 @@ trait FieldConversions extends LowPriorityFieldConversions { //Single entry fields: implicit def unitToFields(u: Unit) = Fields.NONE // linter:ignore - implicit def intToFields(x: Int) = new Fields(new java.lang.Integer(x)) + implicit def intToFields(x: Int) = integerToFields(Integer.valueOf(x)) implicit def integerToFields(x: java.lang.Integer) = new Fields(x) implicit def stringToFields(x: String) = new Fields(x) implicit def enumValueToFields(x: Enumeration#Value) = new Fields(x.toString) @@ -171,27 +189,12 @@ trait FieldConversions extends LowPriorityFieldConversions { implicit def fromEnum[T <: Enumeration](enumeration: T): Fields = new Fields(enumeration.values.toList.map { _.toString }: _*) - implicit def fields[T <: TraversableOnce[Symbol]](f: T) = new Fields(f.toSeq.map(_.name): _*) - implicit def strFields[T <: TraversableOnce[String]](f: T) = new Fields(f.toSeq: _*) - implicit def intFields[T <: TraversableOnce[Int]](f: T) = { - new Fields(f.toSeq.map { new java.lang.Integer(_) }: _*) - } + def fields[T <: TraversableOnce[Symbol]](f: T) = new Fields(f.toSeq.map(_.name): _*) + def strFields[T <: TraversableOnce[String]](f: T) = new Fields(f.toSeq: _*) + def intFields[T <: TraversableOnce[Int]](f: T) = + new Fields(f.toSeq.map(Integer.valueOf): _*) + implicit def fieldFields[T <: TraversableOnce[Field[_]]](f: T) = RichFields(f.toSeq) - /** - * Useful to convert f : Any* to Fields. This handles mixed cases ("hey", 'you). - * Not sure we should be this flexible, but given that Cascading will throw an - * exception before scheduling the job, I guess this is okay. - */ - implicit def parseAnySeqToFields[T <: TraversableOnce[Any]](anyf: T) = { - val fields = new Fields(anyf.toSeq.map { anyToFieldArg }: _*) - anyf.foreach { - _ match { - case field: Field[_] => fields.setComparator(field.id, field.ord) - case _ => - } - } - fields - } //Handle a pair generally: implicit def tuple2ToFieldsPair[T, U](pair: (T, U))(implicit tf: T => Fields, uf: U => Fields): (Fields, Fields) = { @@ -203,7 +206,7 @@ trait FieldConversions extends LowPriorityFieldConversions { * We can't set the field Manifests because cascading doesn't (yet) expose field type information * in the Fields API. */ - implicit def fieldsToRichFields(fields: Fields): RichFields = { + implicit def fieldsToRichFields(fields: Fields): RichFields = if (!fields.isDefined) { // TODO We could provide a reasonable conversion here by designing a rich type hierarchy such as // Fields @@ -215,20 +218,21 @@ trait FieldConversions extends LowPriorityFieldConversions { // of the appropriate type sys.error("virtual Fields cannot be converted to RichFields") } + else { - // This bit is kludgy because cascading provides different interfaces for extracting - // IDs and Comparators from a Fields instance. (The IDs are only available - // "one at a time" by querying for a specific index, while the Comparators are only - // available "all at once" by calling getComparators.) + // This bit is kludgy because cascading provides different interfaces for extracting + // IDs and Comparators from a Fields instance. (The IDs are only available + // "one at a time" by querying for a specific index, while the Comparators are only + // available "all at once" by calling getComparators.) - new RichFields(asList(fields).zip(fields.getComparators).map { - case (id: Comparable[_], comparator: Comparator[_]) => id match { - case x: java.lang.Integer => IntField(x)(Ordering.comparatorToOrdering(comparator), None) - case y: String => StringField(y)(Ordering.comparatorToOrdering(comparator), None) - case z => sys.error("not expecting object of type " + z.getClass + " as field name") - } - }) - } + new RichFields(asList(fields).zip(fields.getComparators).map { + case (id: Comparable[_], comparator: Comparator[_]) => id match { + case x: java.lang.Integer => IntField(x)(Ordering.comparatorToOrdering(comparator), None) + case y: String => StringField(y)(Ordering.comparatorToOrdering(comparator), None) + case z => sys.error("not expecting object of type " + z.getClass + " as field name") + } + }) + } } @@ -238,7 +242,7 @@ trait FieldConversions extends LowPriorityFieldConversions { // val myFields: Fields = ... // myFields.toFieldList -case class RichFields(val toFieldList: List[Field[_]]) extends Fields(toFieldList.map { _.id }: _*) { +case class RichFields(toFieldList: List[Field[_]]) extends Fields(toFieldList.map { _.id }: _*) { toFieldList.foreach { field: Field[_] => setComparator(field.id, field.ord) } } diff --git a/scalding-core/src/main/scala/com/twitter/scalding/ReduceOperations.scala b/scalding-core/src/main/scala/com/twitter/scalding/ReduceOperations.scala index 3b601b78d5..3580810a2d 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/ReduceOperations.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/ReduceOperations.scala @@ -193,13 +193,13 @@ trait ReduceOperations[+Self <: ReduceOperations[Self]] extends java.io.Serializ //CTuple's have unknown arity so we have to put them into a Tuple1 in the middle phase: mapReduceMap(fd) { ctuple: CTuple => Tuple1(ctuple) } { (oldVal, newVal) => oldVal } { result => result._1 } } - def head(f: Symbol*): Self = head(f -> f) + def head(f: Symbol*): Self = head(fields(f) -> fields(f)) def last(fd: (Fields, Fields)) = { //CTuple's have unknown arity so we have to put them into a Tuple1 in the middle phase: mapReduceMap(fd) { ctuple: CTuple => Tuple1(ctuple) } { (oldVal, newVal) => newVal } { result => result._1 } } - def last(f: Symbol*): Self = last(f -> f) + def last(f: Symbol*): Self = last(fields(f) -> fields(f)) /** * Collect all the values into a List[T] and then operate on that @@ -239,9 +239,9 @@ trait ReduceOperations[+Self <: ReduceOperations[Self]] extends java.io.Serializ mapReduceMap(fieldDef) { ctuple: CTuple => Tuple1(ctuple) } { (oldVal, newVal) => if (select(oldVal._1, newVal._1)) oldVal else newVal } { result => result._1 } } def max(fieldDef: (Fields, Fields)) = extremum(true, fieldDef) - def max(f: Symbol*) = extremum(true, (f -> f)) + def max(f: Symbol*) = extremum(true, (fields(f) -> fields(f))) def min(fieldDef: (Fields, Fields)) = extremum(false, fieldDef) - def min(f: Symbol*) = extremum(false, (f -> f)) + def min(f: Symbol*) = extremum(false, (fields(f) -> fields(f))) /** * Similar to the scala.collection.Iterable.mkString @@ -288,7 +288,7 @@ trait ReduceOperations[+Self <: ReduceOperations[Self]] extends java.io.Serializ //Same as reduce(f->f) def reduce[T](fieldDef: Symbol*)(fn: (T, T) => T)(implicit setter: TupleSetter[T], conv: TupleConverter[T]): Self = { - reduce(fieldDef -> fieldDef)(fn)(setter, conv) + reduce(fields(fieldDef) -> fields(fieldDef))(fn)(setter, conv) } // Abstract algebra reductions (sum, times, dot): @@ -309,7 +309,7 @@ trait ReduceOperations[+Self <: ReduceOperations[Self]] extends java.io.Serializ * Assumed to be a commutative operation. If you don't want that, use .forceToReducers */ def sum[T](fs: Symbol*)(implicit sg: Semigroup[T], tconv: TupleConverter[T], tset: TupleSetter[T]): Self = - sum[T](fs -> fs)(sg, tconv, tset) + sum[T](fields(fs) -> fields(fs))(sg, tconv, tset) /** * Returns the product of all the items in this grouping @@ -324,7 +324,7 @@ trait ReduceOperations[+Self <: ReduceOperations[Self]] extends java.io.Serializ * The same as `times(fs -> fs)` */ def times[T](fs: Symbol*)(implicit ring: Ring[T], tconv: TupleConverter[T], tset: TupleSetter[T]): Self = { - times[T](fs -> fs)(ring, tconv, tset) + times[T](fields(fs) -> fields(fs))(ring, tconv, tset) } /** diff --git a/scalding-core/src/main/scala/com/twitter/scalding/mathematics/Combinatorics.scala b/scalding-core/src/main/scala/com/twitter/scalding/mathematics/Combinatorics.scala index 2182a7c2f6..0d6d4d8b61 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/mathematics/Combinatorics.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/mathematics/Combinatorics.scala @@ -202,7 +202,7 @@ object Combinatorics { }.filter('temp){ x: Double => if (allc.size == numWeights) (math.abs(x - result) <= error) else (x <= result) }.discard('temp), allc) - })._1.unique(allColumns) + })._1.unique(fields(allColumns)) (1 to numWeights).zip(weights).foldLeft(res) ((a, b) => { val (num, wt) = b @@ -218,7 +218,7 @@ object Combinatorics { def positiveWeightedSum(weights: IndexedSeq[Double], result: Double, error: Double)(implicit flowDef: FlowDef, mode: Mode): Pipe = { val allColumns = (1 to weights.size).map(x => Symbol("k" + x)) weightedSum(weights, result, error) - .filter(allColumns) { x: TupleEntry => + .filter(fields(allColumns)) { x: TupleEntry => (0 until allColumns.size).forall { i => x.getDouble(java.lang.Integer.valueOf(i)) != 0.0 } } } diff --git a/scalding-core/src/test/scala/com/twitter/scalding/FieldImpsTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/FieldImpsTest.scala index ba772559ed..4b28f2f425 100644 --- a/scalding-core/src/test/scala/com/twitter/scalding/FieldImpsTest.scala +++ b/scalding-core/src/test/scala/com/twitter/scalding/FieldImpsTest.scala @@ -30,7 +30,7 @@ class FieldImpsTest extends WordSpec with Matchers with FieldConversions { (v: Fields) shouldBe (new Fields(v.toString.tail)) } def setAndCheckSymS(v: Seq[Symbol]): Unit = { - (v: Fields) shouldBe (new Fields(v.map(_.toString.tail): _*)) + fields(v) shouldBe (new Fields(v.map(_.toString.tail): _*)) } def setAndCheckField(v: Field[_]): Unit = { val vF: Fields = v @@ -48,7 +48,7 @@ class FieldImpsTest extends WordSpec with Matchers with FieldConversions { (v: Fields) shouldBe (new Fields(v.toString)) } def setAndCheckEnumValueS(v: Seq[Enumeration#Value]): Unit = { - (v: Fields) shouldBe (new Fields(v.map(_.toString): _*)) + (parseAnySeqToFields(v)) shouldBe (new Fields(v.map(_.toString): _*)) } def checkFieldsWithComparators(actual: Fields, expected: Fields): Unit = { // sometimes one or the other is actually a RichFields, so rather than test for @@ -94,15 +94,12 @@ class FieldImpsTest extends WordSpec with Matchers with FieldConversions { "convert from ints" in { setAndCheck(int2Integer(0)) setAndCheck(int2Integer(5)) - setAndCheckS(List(1, 23, 3, 4).map(int2Integer)) - setAndCheckS((0 until 10).map(int2Integer)) } "convert from strings" in { setAndCheck("hey") setAndCheck("world") - setAndCheckS(List("one", "two", "three")) - //Synonym for list - setAndCheckS(Seq("one", "two", "three")) + setAndCheckS(List("one", "two", "three"))(strFields) + setAndCheckS(Seq("one", "two", "three"))(strFields) } "convert from symbols" in { setAndCheckSym('hey) @@ -200,7 +197,7 @@ class FieldImpsTest extends WordSpec with Matchers with FieldConversions { f2 = 'hey -> 'you f2 shouldBe (new Fields("hey"), new Fields("you")) - f2 = (0 until 10) -> 'you + f2 = intFields((0 until 10)) -> 'you f2 shouldBe (new Fields((0 until 10).map(int2Integer): _*), new Fields("you")) f2 = (('hey, 'world) -> 'other) @@ -224,7 +221,7 @@ class FieldImpsTest extends WordSpec with Matchers with FieldConversions { fields.setComparators(foo.ord) f2 shouldBe (fields, new Fields("bar", "bell")) - f2 = Seq("one", "two", "three") -> Seq("1", "2", "3") + f2 = strFields(Seq("one", "two", "three")) -> strFields(Seq("1", "2", "3")) f2 shouldBe (new Fields("one", "two", "three"), new Fields("1", "2", "3")) f2 = List("one", "two", "three") -> List("1", "2", "3") f2 shouldBe (new Fields("one", "two", "three"), new Fields("1", "2", "3")) From 8f4871303e87a45fa8fbeed6fecbfdb60babb408 Mon Sep 17 00:00:00 2001 From: Oscar Boykin Date: Tue, 30 Jan 2018 07:48:15 -1000 Subject: [PATCH 2/2] revert changes --- .../src/test/scala/com/twitter/scalding/FieldImpsTest.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/scalding-core/src/test/scala/com/twitter/scalding/FieldImpsTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/FieldImpsTest.scala index ba772559ed..3e03462836 100644 --- a/scalding-core/src/test/scala/com/twitter/scalding/FieldImpsTest.scala +++ b/scalding-core/src/test/scala/com/twitter/scalding/FieldImpsTest.scala @@ -19,6 +19,8 @@ import cascading.tuple.Fields import org.scalatest.{ Matchers, WordSpec } +import DeprecatedFieldConversions._ + class FieldImpsTest extends WordSpec with Matchers with FieldConversions { def setAndCheck[T <: Comparable[_]](v: T)(implicit conv: (T) => Fields): Unit = { conv(v) shouldBe (new Fields(v))