From 8c3948b9f36e52b9b4bcb8a4d901fe86e5b6bb6a Mon Sep 17 00:00:00 2001 From: ianoc Date: Tue, 26 Sep 2017 16:48:55 -0700 Subject: [PATCH 1/6] PullByteBufferOut as a default ordering --- .../serialization/ComplexHelper.scala | 105 ++++++++++++++++++ .../DefaultOrderedSerialization.scala | 23 ++++ .../scalding/serialization/Exported.scala | 18 +++ .../HasUnsafeCompareBinary.scala | 59 ++++++++++ .../serialization/OrderedSerialization.scala | 7 +- .../OrderedSerializationByteBuffer.scala | 48 ++++++++ .../serialization/Serialization.scala | 6 +- .../serialization/macros/ExportMacros.scala | 24 ++++ .../impl/OrderedBufferableProviderImpl.scala | 2 - .../providers/ByteBufferOrderedBuf.scala | 102 ----------------- .../macros/MacroOrderingProperties.scala | 4 +- 11 files changed, 291 insertions(+), 107 deletions(-) create mode 100644 scalding-serialization/src/main/scala/com/twitter/scalding/serialization/ComplexHelper.scala create mode 100644 scalding-serialization/src/main/scala/com/twitter/scalding/serialization/DefaultOrderedSerialization.scala create mode 100644 scalding-serialization/src/main/scala/com/twitter/scalding/serialization/Exported.scala create mode 100644 scalding-serialization/src/main/scala/com/twitter/scalding/serialization/HasUnsafeCompareBinary.scala create mode 100644 scalding-serialization/src/main/scala/com/twitter/scalding/serialization/OrderedSerializationByteBuffer.scala create mode 100644 scalding-serialization/src/main/scala/com/twitter/scalding/serialization/macros/ExportMacros.scala delete mode 100644 scalding-serialization/src/main/scala/com/twitter/scalding/serialization/macros/impl/ordered_serialization/providers/ByteBufferOrderedBuf.scala diff --git a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/ComplexHelper.scala b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/ComplexHelper.scala new file mode 100644 index 0000000000..fccaffd2ba --- /dev/null +++ b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/ComplexHelper.scala @@ -0,0 +1,105 @@ +/* +Copyright 2015 Twitter, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package com.twitter.scalding.serialization + +import com.twitter.scalding.serialization.JavaStreamEnrichments._ + +import java.io.{ ByteArrayInputStream, InputStream, OutputStream } +import scala.util.{ Failure, Success, Try } +import scala.util.control.NonFatal + +abstract class ComplexHelper[T] extends HasUnsafeCompareBinary[T] { + def staticSize: Option[Int] = None + + protected def dynamicSizeWithoutLen(e: T): Option[Int] + final def dynamicSize(e: T) = + if (staticSize.isDefined) staticSize + else + dynamicSizeWithoutLen(e).map { e => + e + posVarIntSize(e) + } + final def unsafeSize(t: T): Option[Int] = dynamicSizeWithoutLen(t) + + /** + * This is the worst case: we have to serialize in a side buffer + * and then see how large it actually is. This happens for cases, like + * string, where the cost to see the serialized size is not cheaper than + * directly serializing. + */ + private[this] def noLengthWrite(element: T, outerOutputStream: OutputStream): Unit = { + // Start with pretty big buffers because reallocation will be expensive + val baos = new java.io.ByteArrayOutputStream(512) + unsafeWrite(baos, element) + val len = baos.size + outerOutputStream.writePosVarInt(len) + baos.writeTo(outerOutputStream) + } + + final override def write(into: java.io.OutputStream, e: T): Try[Unit] = + try { + if (staticSize.isDefined) { + unsafeWrite(into, e) + } else { + val dynSiz = dynamicSizeWithoutLen(e) + dynSiz match { + case Some(innerSiz) => + + into.writePosVarInt(innerSiz) + unsafeWrite(into, e) + case None => + noLengthWrite(e, into) + } + } + com.twitter.scalding.serialization.Serialization.successUnit + } catch { + case scala.util.control.NonFatal(e) => + scala.util.Failure(e) + } + + final def read(in: InputStream): Try[T] = + try { + if (staticSize.isEmpty) + in.readPosVarInt + + _root_.scala.util.Success(unsafeRead(in)) + } catch { + case _root_.scala.util.control.NonFatal(e) => + _root_.scala.util.Failure(e) + } + + final def compareBinary(inputStreamA: InputStream, + inputStreamB: InputStream): OrderedSerialization.Result = + try com.twitter.scalding.serialization.OrderedSerialization.resultFrom { + val lenA = staticSize.getOrElse(inputStreamA.readPosVarInt) + val lenB = staticSize.getOrElse(inputStreamB.readPosVarInt) + + val posStreamA = com.twitter.scalding.serialization.PositionInputStream(inputStreamA) + val initialPositionA = posStreamA.position + + val posStreamB = com.twitter.scalding.serialization.PositionInputStream(inputStreamB) + val initialPositionB = posStreamB.position + + val innerR = unsafeCompareBinary(posStreamA, posStreamB) + + posStreamA.seekToPosition(initialPositionA + lenA) + posStreamB.seekToPosition(initialPositionB + lenB) + innerR + } catch { + case scala.util.control.NonFatal(e) => + com.twitter.scalding.serialization.OrderedSerialization.CompareFailure(e) + } + +} diff --git a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/DefaultOrderedSerialization.scala b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/DefaultOrderedSerialization.scala new file mode 100644 index 0000000000..02047dba34 --- /dev/null +++ b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/DefaultOrderedSerialization.scala @@ -0,0 +1,23 @@ +/* +Copyright 2015 Twitter, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package com.twitter.scalding.serialization + +import java.nio.ByteBuffer + +object DefaultOrderedSerialization { + implicit def byteBuffer: Exported[OrderedSerialization[ByteBuffer]] = Exported(OrderedSerializationByteBuffer) +} +trait DefaultOrderedSerialization[T] extends OrderedSerialization[T] diff --git a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/Exported.scala b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/Exported.scala new file mode 100644 index 0000000000..fc2e0895ef --- /dev/null +++ b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/Exported.scala @@ -0,0 +1,18 @@ +/* +Copyright 2015 Twitter, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package com.twitter.scalding.serialization + +case class Exported[T](instance: T) extends AnyVal diff --git a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/HasUnsafeCompareBinary.scala b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/HasUnsafeCompareBinary.scala new file mode 100644 index 0000000000..8ae7cd3a71 --- /dev/null +++ b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/HasUnsafeCompareBinary.scala @@ -0,0 +1,59 @@ +/* +Copyright 2015 Twitter, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package com.twitter.scalding.serialization + +import com.twitter.scalding.serialization.JavaStreamEnrichments._ +import java.io.InputStream +import java.io.OutputStream + +trait HasUnsafeCompareBinary[T] extends OrderedSerialization[T] { + def unsafeCompareBinary(inputStreamA: InputStream, inputStreamB: InputStream): Int + def unsafeWrite(out: java.io.OutputStream, t: T): Unit + def unsafeRead(in: java.io.InputStream): T + def unsafeSize(t: T): Option[Int] +} + +object HasUnsafeCompareBinary { + def apply[T](ord: OrderedSerialization[T]): HasUnsafeCompareBinary[T] = ord match { + case e: HasUnsafeCompareBinary[T] => e + case o => + new HasUnsafeCompareBinary[T] { + def unsafeCompareBinary(inputStreamA: InputStream, inputStreamB: InputStream): Int = + o.compareBinary(inputStreamA, inputStreamB).unsafeToInt + + def hash(x: T): Int = o.hash(x) + + // Members declared in com.twitter.scalding.serialization.OrderedSerialization + def compareBinary(a: java.io.InputStream, b: java.io.InputStream): com.twitter.scalding.serialization.OrderedSerialization.Result = + o.compareBinary(a, b) + + // Members declared in scala.math.Ordering + def compare(x: T, y: T): Int = o.compare(x, y) + + def dynamicSize(e: T) = o.dynamicSize(e) + def unsafeSize(t: T): Option[Int] = o.dynamicSize(t) + + // Members declared in com.twitter.scalding.serialization.Serialization + def read(in: java.io.InputStream): scala.util.Try[T] = o.read(in) + def staticSize: Option[Int] = o.staticSize + def unsafeWrite(out: java.io.OutputStream, t: T): Unit = o.write(out, t).get + + def unsafeRead(in: java.io.InputStream): T = o.read(in).get + + def write(out: java.io.OutputStream, t: T): scala.util.Try[Unit] = o.write(out, t) + } + } +} diff --git a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/OrderedSerialization.scala b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/OrderedSerialization.scala index 462b71612d..4cb2764b7e 100644 --- a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/OrderedSerialization.scala +++ b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/OrderedSerialization.scala @@ -34,7 +34,7 @@ trait OrderedSerialization[T] extends Ordering[T] with Serialization[T] { def compareBinary(a: InputStream, b: InputStream): OrderedSerialization.Result } -object OrderedSerialization { +object OrderedSerialization extends LowPriorityOrderedSerialization { /** * Represents the result of a comparison that might fail due * to an error deserializing @@ -214,3 +214,8 @@ final case class DeserializingOrderedSerialization[T](serialization: Serializati final override def staticSize = serialization.staticSize final override def dynamicSize(t: T) = serialization.dynamicSize(t) } + +private[serialization] trait LowPriorityOrderedSerialization { + implicit final def importedOrderedSerialization[A](implicit exported: Exported[OrderedSerialization[A]]): OrderedSerialization[A] = exported.instance +} + diff --git a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/OrderedSerializationByteBuffer.scala b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/OrderedSerializationByteBuffer.scala new file mode 100644 index 0000000000..b459692578 --- /dev/null +++ b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/OrderedSerializationByteBuffer.scala @@ -0,0 +1,48 @@ +package com.twitter.scalding.serialization + +import com.twitter.scalding.serialization.JavaStreamEnrichments._ +import java.nio.ByteBuffer +import java.io.InputStream + +object OrderedSerializationByteBuffer extends ComplexHelper[ByteBuffer] with DefaultOrderedSerialization[ByteBuffer] { + def hash(x: ByteBuffer): Int = + x.hashCode + + def unsafeCompareBinary(inputStreamA: InputStream, inputStreamB: InputStream): Int = { + val lenA = inputStreamA.readPosVarInt + val lenB = inputStreamB.readPosVarInt + val queryLength = _root_.scala.math.min(lenA, lenB) + var incr = 0 + var state = 0 + + while (incr < queryLength && state == 0) { + state = java.lang.Byte.compare(inputStreamA.readByte, inputStreamB.readByte) + incr = incr + 1 + } + if (state == 0) { + java.lang.Integer.compare(lenA, lenB) + } else { + state + } + } + + def unsafeWrite(outputStream: java.io.OutputStream, element: ByteBuffer): Unit = { + outputStream.writePosVarInt(element.remaining) + outputStream + .writeBytes(element.array, element.arrayOffset + element.position, element.remaining) + } + + def unsafeRead(inputStream: java.io.InputStream): ByteBuffer = { + val lenA = inputStream.readPosVarInt + val bytes = new Array[Byte](lenA) + inputStream.readFully(bytes) + java.nio.ByteBuffer.wrap(bytes) + } + + def compare(a: ByteBuffer, b: ByteBuffer): Int = a.compareTo(b) + + def dynamicSizeWithoutLen(element: ByteBuffer): Option[Int] = Some { + val tmpLen = element.remaining + posVarIntSize(tmpLen) + tmpLen + } +} diff --git a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/Serialization.scala b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/Serialization.scala index 5930260999..f9bf403cba 100644 --- a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/Serialization.scala +++ b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/Serialization.scala @@ -38,7 +38,7 @@ import scala.util.hashing.Hashing * implementation. This must satisfy: * (!equiv(a, b)) || (hash(a) == hash(b)) */ -trait Serialization[T] extends Equiv[T] with Hashing[T] with Serializable { +trait Serialization[T] extends Equiv[T] with Hashing[T] with Serializable with LowPrioritySerialization { def read(in: InputStream): Try[T] def write(out: OutputStream, t: T): Try[Unit] /** @@ -171,3 +171,7 @@ object Serialization { sizeLaw, transitivity) } + +private[serialization] trait LowPrioritySerialization { + implicit final def importedSerialization[A](implicit exported: Exported[Serialization[A]]): Serialization[A] = exported.instance +} diff --git a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/macros/ExportMacros.scala b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/macros/ExportMacros.scala new file mode 100644 index 0000000000..77f4e09f6d --- /dev/null +++ b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/macros/ExportMacros.scala @@ -0,0 +1,24 @@ +package com.twitter.scalding.serialization.macros + +import com.twitter.scalding.serialization.Exported +import com.twitter.scalding.serialization.{OrderedSerialization, DefaultOrderedSerialization} +import scala.reflect.macros.whitebox + +class ExportMacros(val c: whitebox.Context) { + import c.universe._ + + final def exportOrderedSerialization[D[x] <: DefaultOrderedSerialization[x], A](implicit + D: c.WeakTypeTag[D[_]], + A: c.WeakTypeTag[A] + ): c.Expr[Exported[OrderedSerialization[A]]] = { + val target = appliedType(D.tpe.typeConstructor, A.tpe) + + c.typecheck(q"implictly[$target]", silent = true) match { + case EmptyTree => c.abort(c.enclosingPosition, s"Unable to infer value of type $target") + case t => c.Expr[Exported[OrderedSerialization[A]]]( + q"new _root_.com.twitter.scalding.serialization.Exported($t: _root_.com.twitter.scalding.serialization.OrderedSerialization[$A])" + ) + } + } + +} diff --git a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/macros/impl/OrderedBufferableProviderImpl.scala b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/macros/impl/OrderedBufferableProviderImpl.scala index f86e6115fc..4b698d5939 100644 --- a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/macros/impl/OrderedBufferableProviderImpl.scala +++ b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/macros/impl/OrderedBufferableProviderImpl.scala @@ -41,7 +41,6 @@ object OrderedSerializationProviderImpl { val stringDispatcher = StringOrderedBuf.dispatch(c) val traversablesDispatcher = TraversablesOrderedBuf.dispatch(c)(buildDispatcher) val unitDispatcher = UnitOrderedBuf.dispatch(c) - val byteBufferDispatcher = ByteBufferOrderedBuf.dispatch(c) val sealedTraitDispatcher = SealedTraitOrderedBuf.dispatch(c)(buildDispatcher) OrderedSerializationProviderImpl @@ -51,7 +50,6 @@ object OrderedSerializationProviderImpl { .orElse(optionDispatcher) .orElse(eitherDispatcher) .orElse(stringDispatcher) - .orElse(byteBufferDispatcher) .orElse(traversablesDispatcher) .orElse(caseClassDispatcher) .orElse(caseObjectDispatcher) diff --git a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/macros/impl/ordered_serialization/providers/ByteBufferOrderedBuf.scala b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/macros/impl/ordered_serialization/providers/ByteBufferOrderedBuf.scala deleted file mode 100644 index 3b9ab67574..0000000000 --- a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/macros/impl/ordered_serialization/providers/ByteBufferOrderedBuf.scala +++ /dev/null @@ -1,102 +0,0 @@ -/* - Copyright 2014 Twitter, Inc. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - */ -package com.twitter.scalding.serialization.macros.impl.ordered_serialization.providers - -import scala.language.experimental.macros -import scala.reflect.macros.blackbox.Context - -import com.twitter.scalding._ -import com.twitter.scalding.serialization.macros.impl.ordered_serialization.{ - CompileTimeLengthTypes, - ProductLike, - TreeOrderedBuf -} -import CompileTimeLengthTypes._ - -import java.nio.ByteBuffer -import com.twitter.scalding.serialization.OrderedSerialization - -object ByteBufferOrderedBuf { - def dispatch(c: Context): PartialFunction[c.Type, TreeOrderedBuf[c.type]] = { - case tpe if tpe =:= c.universe.typeOf[ByteBuffer] => ByteBufferOrderedBuf(c)(tpe) - } - - def apply(c: Context)(outerType: c.Type): TreeOrderedBuf[c.type] = { - import c.universe._ - - def freshT(id: String) = TermName(c.freshName(id)) - - new TreeOrderedBuf[c.type] { - override val ctx: c.type = c - override val tpe = outerType - override def hash(element: ctx.TermName): ctx.Tree = q"$element.hashCode" - - override def compareBinary(inputStreamA: ctx.TermName, inputStreamB: ctx.TermName) = { - val lenA = freshT("lenA") - val lenB = freshT("lenB") - val queryLength = freshT("queryLength") - val incr = freshT("incr") - val state = freshT("state") - q""" - val $lenA: Int = $inputStreamA.readPosVarInt - val $lenB: Int = $inputStreamB.readPosVarInt - - val $queryLength = _root_.scala.math.min($lenA, $lenB) - var $incr = 0 - var $state = 0 - - while($incr < $queryLength && $state == 0) { - $state = _root_.java.lang.Byte.compare($inputStreamA.readByte, $inputStreamB.readByte) - $incr = $incr + 1 - } - if($state == 0) { - _root_.java.lang.Integer.compare($lenA, $lenB) - } else { - $state - } - """ - } - override def put(inputStream: ctx.TermName, element: ctx.TermName) = - q""" - $inputStream.writePosVarInt($element.remaining) - $inputStream.writeBytes($element.array, $element.arrayOffset + $element.position, $element.remaining) - """ - - override def get(inputStream: ctx.TermName): ctx.Tree = { - val lenA = freshT("lenA") - val bytes = freshT("bytes") - q""" - val $lenA = $inputStream.readPosVarInt - val $bytes = new Array[Byte]($lenA) - $inputStream.readFully($bytes) - _root_.java.nio.ByteBuffer.wrap($bytes) - """ - } - override def compare(elementA: ctx.TermName, elementB: ctx.TermName): ctx.Tree = q""" - $elementA.compareTo($elementB) - """ - override def length(element: Tree): CompileTimeLengthTypes[c.type] = { - val tmpLen = freshT("tmpLen") - FastLengthCalculation(c)(q""" - val $tmpLen = $element.remaining - posVarIntSize($tmpLen) + $tmpLen - """) - } - - def lazyOuterVariables: Map[String, ctx.Tree] = Map.empty - } - } -} diff --git a/scalding-serialization/src/test/scala/com/twitter/scalding/serialization/macros/MacroOrderingProperties.scala b/scalding-serialization/src/test/scala/com/twitter/scalding/serialization/macros/MacroOrderingProperties.scala index c8ae779082..efe1681aa6 100644 --- a/scalding-serialization/src/test/scala/com/twitter/scalding/serialization/macros/MacroOrderingProperties.scala +++ b/scalding-serialization/src/test/scala/com/twitter/scalding/serialization/macros/MacroOrderingProperties.scala @@ -36,6 +36,8 @@ import com.twitter.scalding.some.other.space.space._ import scala.collection.immutable.Queue import scala.language.experimental.macros import com.twitter.scalding.serialization.macros.impl.BinaryOrdering +import com.twitter.scalding.serialization.DefaultOrderedSerialization._ +import com.twitter.scalding.serialization._ object LawTester { def apply[T: Arbitrary](laws: Iterable[Law[T]]): Prop = @@ -510,7 +512,7 @@ class MacroOrderingProperties } test("Test out ByteBuffer") { - BinaryOrdering.ordSer[ByteBuffer] + implicitly[OrderedSerialization[ByteBuffer]] check[ByteBuffer] checkCollisions[ByteBuffer] } From a12f49b15c5d1bb1ae941b46e8d938dba3e6812e Mon Sep 17 00:00:00 2001 From: ianoc Date: Wed, 27 Sep 2017 08:59:45 -0700 Subject: [PATCH 2/6] Move new code into a more useful package space. Add a priority test --- .../serialization/ComplexHelper.scala | 28 ++++----- .../DefaultOrderedSerialization.scala | 23 ------- .../scalding/serialization/Exported.scala | 4 ++ .../HasUnsafeCompareBinary.scala | 2 +- .../serialization/macros/ExportMacros.scala | 24 ------- .../OrderedSerializationByteBuffer.scala | 5 +- .../serialization/provided/package.scala | 9 +++ .../macros/MacroOrderingProperties.scala | 2 +- .../macros/ProvidedPriorityTest.scala | 62 +++++++++++++++++++ 9 files changed, 94 insertions(+), 65 deletions(-) delete mode 100644 scalding-serialization/src/main/scala/com/twitter/scalding/serialization/DefaultOrderedSerialization.scala delete mode 100644 scalding-serialization/src/main/scala/com/twitter/scalding/serialization/macros/ExportMacros.scala rename scalding-serialization/src/main/scala/com/twitter/scalding/serialization/{ => provided}/OrderedSerializationByteBuffer.scala (92%) create mode 100644 scalding-serialization/src/main/scala/com/twitter/scalding/serialization/provided/package.scala create mode 100644 scalding-serialization/src/test/scala/com/twitter/scalding/serialization/macros/ProvidedPriorityTest.scala diff --git a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/ComplexHelper.scala b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/ComplexHelper.scala index fccaffd2ba..95cb87883e 100644 --- a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/ComplexHelper.scala +++ b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/ComplexHelper.scala @@ -17,7 +17,7 @@ package com.twitter.scalding.serialization import com.twitter.scalding.serialization.JavaStreamEnrichments._ -import java.io.{ ByteArrayInputStream, InputStream, OutputStream } +import java.io.{ ByteArrayInputStream, ByteArrayOutputStream, InputStream, OutputStream } import scala.util.{ Failure, Success, Try } import scala.util.control.NonFatal @@ -41,14 +41,14 @@ abstract class ComplexHelper[T] extends HasUnsafeCompareBinary[T] { */ private[this] def noLengthWrite(element: T, outerOutputStream: OutputStream): Unit = { // Start with pretty big buffers because reallocation will be expensive - val baos = new java.io.ByteArrayOutputStream(512) + val baos = new ByteArrayOutputStream(512) unsafeWrite(baos, element) val len = baos.size outerOutputStream.writePosVarInt(len) baos.writeTo(outerOutputStream) } - final override def write(into: java.io.OutputStream, e: T): Try[Unit] = + final override def write(into: OutputStream, e: T): Try[Unit] = try { if (staticSize.isDefined) { unsafeWrite(into, e) @@ -63,10 +63,10 @@ abstract class ComplexHelper[T] extends HasUnsafeCompareBinary[T] { noLengthWrite(e, into) } } - com.twitter.scalding.serialization.Serialization.successUnit + Serialization.successUnit } catch { - case scala.util.control.NonFatal(e) => - scala.util.Failure(e) + case NonFatal(e) => + Failure(e) } final def read(in: InputStream): Try[T] = @@ -74,22 +74,22 @@ abstract class ComplexHelper[T] extends HasUnsafeCompareBinary[T] { if (staticSize.isEmpty) in.readPosVarInt - _root_.scala.util.Success(unsafeRead(in)) + Success(unsafeRead(in)) } catch { - case _root_.scala.util.control.NonFatal(e) => - _root_.scala.util.Failure(e) + case NonFatal(e) => + Failure(e) } final def compareBinary(inputStreamA: InputStream, inputStreamB: InputStream): OrderedSerialization.Result = - try com.twitter.scalding.serialization.OrderedSerialization.resultFrom { + try OrderedSerialization.resultFrom { val lenA = staticSize.getOrElse(inputStreamA.readPosVarInt) val lenB = staticSize.getOrElse(inputStreamB.readPosVarInt) - val posStreamA = com.twitter.scalding.serialization.PositionInputStream(inputStreamA) + val posStreamA = PositionInputStream(inputStreamA) val initialPositionA = posStreamA.position - val posStreamB = com.twitter.scalding.serialization.PositionInputStream(inputStreamB) + val posStreamB = PositionInputStream(inputStreamB) val initialPositionB = posStreamB.position val innerR = unsafeCompareBinary(posStreamA, posStreamB) @@ -98,8 +98,8 @@ abstract class ComplexHelper[T] extends HasUnsafeCompareBinary[T] { posStreamB.seekToPosition(initialPositionB + lenB) innerR } catch { - case scala.util.control.NonFatal(e) => - com.twitter.scalding.serialization.OrderedSerialization.CompareFailure(e) + case NonFatal(e) => + OrderedSerialization.CompareFailure(e) } } diff --git a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/DefaultOrderedSerialization.scala b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/DefaultOrderedSerialization.scala deleted file mode 100644 index 02047dba34..0000000000 --- a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/DefaultOrderedSerialization.scala +++ /dev/null @@ -1,23 +0,0 @@ -/* -Copyright 2015 Twitter, Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ -package com.twitter.scalding.serialization - -import java.nio.ByteBuffer - -object DefaultOrderedSerialization { - implicit def byteBuffer: Exported[OrderedSerialization[ByteBuffer]] = Exported(OrderedSerializationByteBuffer) -} -trait DefaultOrderedSerialization[T] extends OrderedSerialization[T] diff --git a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/Exported.scala b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/Exported.scala index fc2e0895ef..c776272f2e 100644 --- a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/Exported.scala +++ b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/Exported.scala @@ -15,4 +15,8 @@ limitations under the License. */ package com.twitter.scalding.serialization +// We wrap types in Exported to provide low priority implicits +// the real type has a low priority implicit to extract from Exported +// into the original type. +// See more @ https://github.com/milessabin/export-hook case class Exported[T](instance: T) extends AnyVal diff --git a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/HasUnsafeCompareBinary.scala b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/HasUnsafeCompareBinary.scala index 8ae7cd3a71..dc47c5b7bf 100644 --- a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/HasUnsafeCompareBinary.scala +++ b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/HasUnsafeCompareBinary.scala @@ -37,7 +37,7 @@ object HasUnsafeCompareBinary { def hash(x: T): Int = o.hash(x) // Members declared in com.twitter.scalding.serialization.OrderedSerialization - def compareBinary(a: java.io.InputStream, b: java.io.InputStream): com.twitter.scalding.serialization.OrderedSerialization.Result = + def compareBinary(a: java.io.InputStream, b: java.io.InputStream): OrderedSerialization.Result = o.compareBinary(a, b) // Members declared in scala.math.Ordering diff --git a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/macros/ExportMacros.scala b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/macros/ExportMacros.scala deleted file mode 100644 index 77f4e09f6d..0000000000 --- a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/macros/ExportMacros.scala +++ /dev/null @@ -1,24 +0,0 @@ -package com.twitter.scalding.serialization.macros - -import com.twitter.scalding.serialization.Exported -import com.twitter.scalding.serialization.{OrderedSerialization, DefaultOrderedSerialization} -import scala.reflect.macros.whitebox - -class ExportMacros(val c: whitebox.Context) { - import c.universe._ - - final def exportOrderedSerialization[D[x] <: DefaultOrderedSerialization[x], A](implicit - D: c.WeakTypeTag[D[_]], - A: c.WeakTypeTag[A] - ): c.Expr[Exported[OrderedSerialization[A]]] = { - val target = appliedType(D.tpe.typeConstructor, A.tpe) - - c.typecheck(q"implictly[$target]", silent = true) match { - case EmptyTree => c.abort(c.enclosingPosition, s"Unable to infer value of type $target") - case t => c.Expr[Exported[OrderedSerialization[A]]]( - q"new _root_.com.twitter.scalding.serialization.Exported($t: _root_.com.twitter.scalding.serialization.OrderedSerialization[$A])" - ) - } - } - -} diff --git a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/OrderedSerializationByteBuffer.scala b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/provided/OrderedSerializationByteBuffer.scala similarity index 92% rename from scalding-serialization/src/main/scala/com/twitter/scalding/serialization/OrderedSerializationByteBuffer.scala rename to scalding-serialization/src/main/scala/com/twitter/scalding/serialization/provided/OrderedSerializationByteBuffer.scala index b459692578..7a4e393e87 100644 --- a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/OrderedSerializationByteBuffer.scala +++ b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/provided/OrderedSerializationByteBuffer.scala @@ -1,10 +1,11 @@ -package com.twitter.scalding.serialization +package com.twitter.scalding.serialization.provided import com.twitter.scalding.serialization.JavaStreamEnrichments._ import java.nio.ByteBuffer import java.io.InputStream +import com.twitter.scalding.serialization.ComplexHelper -object OrderedSerializationByteBuffer extends ComplexHelper[ByteBuffer] with DefaultOrderedSerialization[ByteBuffer] { +object OrderedSerializationByteBuffer extends ComplexHelper[ByteBuffer] { def hash(x: ByteBuffer): Int = x.hashCode diff --git a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/provided/package.scala b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/provided/package.scala new file mode 100644 index 0000000000..e39fb86947 --- /dev/null +++ b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/provided/package.scala @@ -0,0 +1,9 @@ +package com.twitter.scalding.serialization + +import java.nio.ByteBuffer + +import com.twitter.scalding.serialization.provided.{ OrderedSerializationByteBuffer } + +package object provided { + implicit def byteBufferOrderedSerialization: Exported[OrderedSerialization[ByteBuffer]] = Exported(OrderedSerializationByteBuffer) +} diff --git a/scalding-serialization/src/test/scala/com/twitter/scalding/serialization/macros/MacroOrderingProperties.scala b/scalding-serialization/src/test/scala/com/twitter/scalding/serialization/macros/MacroOrderingProperties.scala index efe1681aa6..787717ce92 100644 --- a/scalding-serialization/src/test/scala/com/twitter/scalding/serialization/macros/MacroOrderingProperties.scala +++ b/scalding-serialization/src/test/scala/com/twitter/scalding/serialization/macros/MacroOrderingProperties.scala @@ -36,7 +36,7 @@ import com.twitter.scalding.some.other.space.space._ import scala.collection.immutable.Queue import scala.language.experimental.macros import com.twitter.scalding.serialization.macros.impl.BinaryOrdering -import com.twitter.scalding.serialization.DefaultOrderedSerialization._ +import com.twitter.scalding.serialization.provided._ import com.twitter.scalding.serialization._ object LawTester { diff --git a/scalding-serialization/src/test/scala/com/twitter/scalding/serialization/macros/ProvidedPriorityTest.scala b/scalding-serialization/src/test/scala/com/twitter/scalding/serialization/macros/ProvidedPriorityTest.scala new file mode 100644 index 0000000000..6b7b7b11f7 --- /dev/null +++ b/scalding-serialization/src/test/scala/com/twitter/scalding/serialization/macros/ProvidedPriorityTest.scala @@ -0,0 +1,62 @@ +/* +Copyright 2012 Twitter, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package com.twitter.scalding.serialization.macros +import scala.language.higherKinds +import java.io.{ ByteArrayOutputStream, InputStream } +import java.nio.ByteBuffer + +import com.twitter.scalding.serialization.{ + JavaStreamEnrichments, + Law, + Law1, + Law2, + Law3, + OrderedSerialization, + Serialization +} +import org.scalatest.FunSuite +import com.twitter.scalding.serialization.StringOrderedSerialization +import com.twitter.scalding.serialization._ + +object CompanionOrderedSerialization { + val v = (new StringOrderedSerialization).asInstanceOf[OrderedSerialization[SampleCaseClass]] +} +object ProvidedOrderedSerialization { + val v = (new StringOrderedSerialization).asInstanceOf[OrderedSerialization[SampleCaseClass]] +} + +object SampleCaseClass { + implicit def ordSer: OrderedSerialization[SampleCaseClass] = CompanionOrderedSerialization.v +} +case class SampleCaseClass(v: Int) + +object ProvidedImports { + implicit def aProvided: Exported[OrderedSerialization[SampleCaseClass]] = Exported(ProvidedOrderedSerialization.v) +} +class ProvidedPriorityTest + extends FunSuite { + + import ProvidedImports._ + def checkLocalOverride[T: OrderedSerialization] = { + assert(implicitly[OrderedSerialization[T]] === CompanionOrderedSerialization.v) + } + + test("Test that the imported scope doesn't override a companion object") { + checkLocalOverride[SampleCaseClass] + } + +} From e03faa4fffa75acb2c03345ec2d6652239e7fb8b Mon Sep 17 00:00:00 2001 From: ianoc Date: Wed, 27 Sep 2017 09:01:24 -0700 Subject: [PATCH 3/6] Remove copyright --- .../scala/com/twitter/scalding/serialization/Exported.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/Exported.scala b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/Exported.scala index c776272f2e..c12c0046e4 100644 --- a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/Exported.scala +++ b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/Exported.scala @@ -1,6 +1,4 @@ /* -Copyright 2015 Twitter, Inc. - Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at From 7624dac9a6b2ccae9ff4c70df871bc8f83b83e1a Mon Sep 17 00:00:00 2001 From: ianoc Date: Wed, 27 Sep 2017 09:02:15 -0700 Subject: [PATCH 4/6] Might aswell use a val to avoid the allocation during setup --- .../com/twitter/scalding/serialization/provided/package.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/provided/package.scala b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/provided/package.scala index e39fb86947..529edceb10 100644 --- a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/provided/package.scala +++ b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/provided/package.scala @@ -5,5 +5,5 @@ import java.nio.ByteBuffer import com.twitter.scalding.serialization.provided.{ OrderedSerializationByteBuffer } package object provided { - implicit def byteBufferOrderedSerialization: Exported[OrderedSerialization[ByteBuffer]] = Exported(OrderedSerializationByteBuffer) + implicit val byteBufferOrderedSerialization: Exported[OrderedSerialization[ByteBuffer]] = Exported(OrderedSerializationByteBuffer) } From 3bc7afbc2a0dc554f00f3047c08a1f98cc9a40cb Mon Sep 17 00:00:00 2001 From: ianoc Date: Wed, 27 Sep 2017 11:51:42 -0700 Subject: [PATCH 5/6] Alternate approach --- .../serialization/ComplexHelper.scala | 105 -- .../HasUnsafeCompareBinary.scala | 59 -- .../serialization/OrderedSerialization.scala | 8 + .../serialization/Serialization.scala | 6 + .../OrderedSerializationByteBuffer.scala | 32 +- .../provided/UnsafeCompareBinary.scala | 74 ++ .../macros/MacroOrderingProperties.scala | 910 +++++++++--------- 7 files changed, 570 insertions(+), 624 deletions(-) delete mode 100644 scalding-serialization/src/main/scala/com/twitter/scalding/serialization/ComplexHelper.scala delete mode 100644 scalding-serialization/src/main/scala/com/twitter/scalding/serialization/HasUnsafeCompareBinary.scala create mode 100644 scalding-serialization/src/main/scala/com/twitter/scalding/serialization/provided/UnsafeCompareBinary.scala diff --git a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/ComplexHelper.scala b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/ComplexHelper.scala deleted file mode 100644 index 95cb87883e..0000000000 --- a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/ComplexHelper.scala +++ /dev/null @@ -1,105 +0,0 @@ -/* -Copyright 2015 Twitter, Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ -package com.twitter.scalding.serialization - -import com.twitter.scalding.serialization.JavaStreamEnrichments._ - -import java.io.{ ByteArrayInputStream, ByteArrayOutputStream, InputStream, OutputStream } -import scala.util.{ Failure, Success, Try } -import scala.util.control.NonFatal - -abstract class ComplexHelper[T] extends HasUnsafeCompareBinary[T] { - def staticSize: Option[Int] = None - - protected def dynamicSizeWithoutLen(e: T): Option[Int] - final def dynamicSize(e: T) = - if (staticSize.isDefined) staticSize - else - dynamicSizeWithoutLen(e).map { e => - e + posVarIntSize(e) - } - final def unsafeSize(t: T): Option[Int] = dynamicSizeWithoutLen(t) - - /** - * This is the worst case: we have to serialize in a side buffer - * and then see how large it actually is. This happens for cases, like - * string, where the cost to see the serialized size is not cheaper than - * directly serializing. - */ - private[this] def noLengthWrite(element: T, outerOutputStream: OutputStream): Unit = { - // Start with pretty big buffers because reallocation will be expensive - val baos = new ByteArrayOutputStream(512) - unsafeWrite(baos, element) - val len = baos.size - outerOutputStream.writePosVarInt(len) - baos.writeTo(outerOutputStream) - } - - final override def write(into: OutputStream, e: T): Try[Unit] = - try { - if (staticSize.isDefined) { - unsafeWrite(into, e) - } else { - val dynSiz = dynamicSizeWithoutLen(e) - dynSiz match { - case Some(innerSiz) => - - into.writePosVarInt(innerSiz) - unsafeWrite(into, e) - case None => - noLengthWrite(e, into) - } - } - Serialization.successUnit - } catch { - case NonFatal(e) => - Failure(e) - } - - final def read(in: InputStream): Try[T] = - try { - if (staticSize.isEmpty) - in.readPosVarInt - - Success(unsafeRead(in)) - } catch { - case NonFatal(e) => - Failure(e) - } - - final def compareBinary(inputStreamA: InputStream, - inputStreamB: InputStream): OrderedSerialization.Result = - try OrderedSerialization.resultFrom { - val lenA = staticSize.getOrElse(inputStreamA.readPosVarInt) - val lenB = staticSize.getOrElse(inputStreamB.readPosVarInt) - - val posStreamA = PositionInputStream(inputStreamA) - val initialPositionA = posStreamA.position - - val posStreamB = PositionInputStream(inputStreamB) - val initialPositionB = posStreamB.position - - val innerR = unsafeCompareBinary(posStreamA, posStreamB) - - posStreamA.seekToPosition(initialPositionA + lenA) - posStreamB.seekToPosition(initialPositionB + lenB) - innerR - } catch { - case NonFatal(e) => - OrderedSerialization.CompareFailure(e) - } - -} diff --git a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/HasUnsafeCompareBinary.scala b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/HasUnsafeCompareBinary.scala deleted file mode 100644 index dc47c5b7bf..0000000000 --- a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/HasUnsafeCompareBinary.scala +++ /dev/null @@ -1,59 +0,0 @@ -/* -Copyright 2015 Twitter, Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ -package com.twitter.scalding.serialization - -import com.twitter.scalding.serialization.JavaStreamEnrichments._ -import java.io.InputStream -import java.io.OutputStream - -trait HasUnsafeCompareBinary[T] extends OrderedSerialization[T] { - def unsafeCompareBinary(inputStreamA: InputStream, inputStreamB: InputStream): Int - def unsafeWrite(out: java.io.OutputStream, t: T): Unit - def unsafeRead(in: java.io.InputStream): T - def unsafeSize(t: T): Option[Int] -} - -object HasUnsafeCompareBinary { - def apply[T](ord: OrderedSerialization[T]): HasUnsafeCompareBinary[T] = ord match { - case e: HasUnsafeCompareBinary[T] => e - case o => - new HasUnsafeCompareBinary[T] { - def unsafeCompareBinary(inputStreamA: InputStream, inputStreamB: InputStream): Int = - o.compareBinary(inputStreamA, inputStreamB).unsafeToInt - - def hash(x: T): Int = o.hash(x) - - // Members declared in com.twitter.scalding.serialization.OrderedSerialization - def compareBinary(a: java.io.InputStream, b: java.io.InputStream): OrderedSerialization.Result = - o.compareBinary(a, b) - - // Members declared in scala.math.Ordering - def compare(x: T, y: T): Int = o.compare(x, y) - - def dynamicSize(e: T) = o.dynamicSize(e) - def unsafeSize(t: T): Option[Int] = o.dynamicSize(t) - - // Members declared in com.twitter.scalding.serialization.Serialization - def read(in: java.io.InputStream): scala.util.Try[T] = o.read(in) - def staticSize: Option[Int] = o.staticSize - def unsafeWrite(out: java.io.OutputStream, t: T): Unit = o.write(out, t).get - - def unsafeRead(in: java.io.InputStream): T = o.read(in).get - - def write(out: java.io.OutputStream, t: T): scala.util.Try[Unit] = o.write(out, t) - } - } -} diff --git a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/OrderedSerialization.scala b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/OrderedSerialization.scala index 4cb2764b7e..d4ba6611c8 100644 --- a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/OrderedSerialization.scala +++ b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/OrderedSerialization.scala @@ -32,6 +32,14 @@ trait OrderedSerialization[T] extends Ordering[T] with Serialization[T] { * the InputStreams is mutated to be the end of the record. */ def compareBinary(a: InputStream, b: InputStream): OrderedSerialization.Result + + /** + * This compares two InputStreams. After this call, the position in + * the InputStreams may or may not be at the end of the record. + */ + def compareBinaryNoConsume(a: InputStream, b: InputStream): OrderedSerialization.Result = { + compareBinary(a, b) + } } object OrderedSerialization extends LowPriorityOrderedSerialization { diff --git a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/Serialization.scala b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/Serialization.scala index f9bf403cba..0cbd0a47aa 100644 --- a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/Serialization.scala +++ b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/Serialization.scala @@ -41,6 +41,7 @@ import scala.util.hashing.Hashing trait Serialization[T] extends Equiv[T] with Hashing[T] with Serializable with LowPrioritySerialization { def read(in: InputStream): Try[T] def write(out: OutputStream, t: T): Try[Unit] + /** * If all items have a static size, this returns Some, else None * NOTE: lawful implementations that return Some here much return @@ -53,6 +54,11 @@ trait Serialization[T] extends Equiv[T] with Hashing[T] with Serializable with L * otherwise the caller should just serialize into an ByteArrayOutputStream */ def dynamicSize(t: T): Option[Int] + + // Override this to provide more efficient + def skip(in: InputStream): Try[Unit] = { + read(in).map{ _ => () } + } } /** diff --git a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/provided/OrderedSerializationByteBuffer.scala b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/provided/OrderedSerializationByteBuffer.scala index 7a4e393e87..8b2ef36c33 100644 --- a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/provided/OrderedSerializationByteBuffer.scala +++ b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/provided/OrderedSerializationByteBuffer.scala @@ -3,16 +3,23 @@ package com.twitter.scalding.serialization.provided import com.twitter.scalding.serialization.JavaStreamEnrichments._ import java.nio.ByteBuffer import java.io.InputStream -import com.twitter.scalding.serialization.ComplexHelper +import com.twitter.scalding.serialization.Serialization +import com.twitter.scalding.serialization.JavaStreamEnrichments._ +import scala.math.min +import scala.util.{ Failure, Try } +import scala.util.control.NonFatal + +object OrderedSerializationByteBuffer extends UnsafeOrderedSerialization[ByteBuffer] { + + override def staticSize: Option[Int] = None -object OrderedSerializationByteBuffer extends ComplexHelper[ByteBuffer] { def hash(x: ByteBuffer): Int = x.hashCode - def unsafeCompareBinary(inputStreamA: InputStream, inputStreamB: InputStream): Int = { + override def unsafeSwitchingCompareBinaryNoConsume(inputStreamA: InputStream, inputStreamB: InputStream, consumeToEnd: Boolean): Int = { val lenA = inputStreamA.readPosVarInt val lenB = inputStreamB.readPosVarInt - val queryLength = _root_.scala.math.min(lenA, lenB) + val queryLength = scala.math.min(lenA, lenB) var incr = 0 var state = 0 @@ -20,6 +27,10 @@ object OrderedSerializationByteBuffer extends ComplexHelper[ByteBuffer] { state = java.lang.Byte.compare(inputStreamA.readByte, inputStreamB.readByte) incr = incr + 1 } + if (consumeToEnd) { + inputStreamA.skip(lenA - incr) + inputStreamB.skip(lenB - incr) + } if (state == 0) { java.lang.Integer.compare(lenA, lenB) } else { @@ -27,6 +38,17 @@ object OrderedSerializationByteBuffer extends ComplexHelper[ByteBuffer] { } } + override def skip(inputStream: InputStream): Try[Unit] = { + try { + val lenA = inputStream.readPosVarInt + inputStream.skip(lenA) + Serialization.successUnit + } catch { + case NonFatal(e) => + Failure(e) + } + } + def unsafeWrite(outputStream: java.io.OutputStream, element: ByteBuffer): Unit = { outputStream.writePosVarInt(element.remaining) outputStream @@ -42,7 +64,7 @@ object OrderedSerializationByteBuffer extends ComplexHelper[ByteBuffer] { def compare(a: ByteBuffer, b: ByteBuffer): Int = a.compareTo(b) - def dynamicSizeWithoutLen(element: ByteBuffer): Option[Int] = Some { + def dynamicSize(element: ByteBuffer): Option[Int] = Some { val tmpLen = element.remaining posVarIntSize(tmpLen) + tmpLen } diff --git a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/provided/UnsafeCompareBinary.scala b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/provided/UnsafeCompareBinary.scala new file mode 100644 index 0000000000..c444edf54d --- /dev/null +++ b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/provided/UnsafeCompareBinary.scala @@ -0,0 +1,74 @@ +/* +Copyright 2015 Twitter, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package com.twitter.scalding.serialization.provided + +import com.twitter.scalding.serialization.JavaStreamEnrichments._ + +import java.io.{ ByteArrayInputStream, ByteArrayOutputStream, InputStream, OutputStream } +import scala.util.{ Failure, Success, Try } +import scala.util.control.NonFatal +import com.twitter.scalding.serialization._ + +abstract class UnsafeOrderedSerialization[T] extends OrderedSerialization[T] { + // This will write out the interior data as a blob with no prepended length + // This means binary compare cannot skip on this data. + // However the contract remains that one should be able to _read_ the data + // back out again. + def unsafeWrite(out: java.io.OutputStream, t: T): Unit + // This is an internal read method that matches the unsafe write + // importantly any outer length header added to enable skipping isn't included here + def unsafeRead(in: java.io.InputStream): T + + // This is an inner binary compare that the user must supply + def unsafeSwitchingCompareBinaryNoConsume(inputStreamA: InputStream, inputStreamB: InputStream, shouldConsume: Boolean): Int + + // This is the public write method, if we need to inject a size head of the object + // this is where we do it! + final override def write(into: OutputStream, e: T): Try[Unit] = + try { + unsafeWrite(into, e) + Serialization.successUnit + } catch { + case NonFatal(e) => + Failure(e) + } + + final def read(in: InputStream): Try[T] = + try { + Success(unsafeRead(in)) + } catch { + case NonFatal(e) => + Failure(e) + } + + override def compareBinaryNoConsume(inputStreamA: InputStream, inputStreamB: InputStream): OrderedSerialization.Result = + try { + val r = unsafeSwitchingCompareBinaryNoConsume(inputStreamA, inputStreamB, false) + OrderedSerialization.resultFrom(r) + } catch { + case NonFatal(e) => + OrderedSerialization.CompareFailure(e) + } + + override def compareBinary(inputStreamA: InputStream, inputStreamB: InputStream): OrderedSerialization.Result = + try { + val r = unsafeSwitchingCompareBinaryNoConsume(inputStreamA, inputStreamB, true) + OrderedSerialization.resultFrom(r) + } catch { + case NonFatal(e) => + OrderedSerialization.CompareFailure(e) + } +} diff --git a/scalding-serialization/src/test/scala/com/twitter/scalding/serialization/macros/MacroOrderingProperties.scala b/scalding-serialization/src/test/scala/com/twitter/scalding/serialization/macros/MacroOrderingProperties.scala index 787717ce92..003007bfe7 100644 --- a/scalding-serialization/src/test/scala/com/twitter/scalding/serialization/macros/MacroOrderingProperties.scala +++ b/scalding-serialization/src/test/scala/com/twitter/scalding/serialization/macros/MacroOrderingProperties.scala @@ -401,115 +401,115 @@ class MacroOrderingProperties def noOrderedSerialization[T](implicit ev: OrderedSerialization[T] = null) = assert(ev === null, "Expected unable to produce OrderedSerialization") - test("Test out Unit") { - BinaryOrdering.ordSer[Unit] - check[Unit] - checkMany[Unit] - } - test("Test out Boolean") { - BinaryOrdering.ordSer[Boolean] - check[Boolean] - } - test("Test out jl.Boolean") { - implicit val a = arbMap { b: Boolean => - java.lang.Boolean.valueOf(b) - } - check[java.lang.Boolean] - } - test("Test out Byte") { check[Byte] } - test("Test out jl.Byte") { - implicit val a = arbMap { b: Byte => - java.lang.Byte.valueOf(b) - } - check[java.lang.Byte] - checkCollisions[java.lang.Byte] - } - test("Test out Short") { check[Short] } - test("Test out jl.Short") { - implicit val a = arbMap { b: Short => - java.lang.Short.valueOf(b) - } - check[java.lang.Short] - checkCollisions[java.lang.Short] - } - test("Test out Char") { check[Char] } - test("Test out jl.Char") { - implicit val a = arbMap { b: Char => - java.lang.Character.valueOf(b) - } - check[java.lang.Character] - checkCollisions[java.lang.Character] - } - test("Test out Int") { - BinaryOrdering.ordSer[Int] - check[Int] - checkMany[Int] - checkCollisions[Int] - } - - test("Test out AnyVal of String") { - import TestCC._ - check[TestCaseClassE] - checkMany[TestCaseClassE] - checkCollisions[TestCaseClassE] - } - - test("Test out Tuple of AnyVal's of String") { - import TestCC._ - BinaryOrdering.ordSer[(TestCaseClassE, TestCaseClassE)] - check[(TestCaseClassE, TestCaseClassE)] - checkMany[(TestCaseClassE, TestCaseClassE)] - checkCollisions[(TestCaseClassE, TestCaseClassE)] - } - - test("Test out Tuple of TestSealedAbstractClass") { - import TestCC._ - BinaryOrdering.ordSer[TestSealedAbstractClass] - check[TestSealedAbstractClass] - checkMany[TestSealedAbstractClass] - checkCollisions[TestSealedAbstractClass] - } - - test("Test out jl.Integer") { - implicit val a = arbMap { b: Int => - java.lang.Integer.valueOf(b) - } - check[java.lang.Integer] - checkCollisions[java.lang.Integer] - - } - test("Test out Float") { check[Float] } - test("Test out jl.Float") { - implicit val a = arbMap { b: Float => - java.lang.Float.valueOf(b) - } - check[java.lang.Float] - checkCollisions[java.lang.Float] - } - test("Test out Long") { check[Long] } - test("Test out jl.Long") { - implicit val a = arbMap { b: Long => - java.lang.Long.valueOf(b) - } - check[java.lang.Long] - checkCollisions[java.lang.Long] - } - test("Test out Double") { check[Double] } - test("Test out jl.Double") { - implicit val a = arbMap { b: Double => - java.lang.Double.valueOf(b) - } - check[java.lang.Double] - checkCollisions[java.lang.Double] - } - - test("Test out String") { - BinaryOrdering.ordSer[String] - - check[String] - checkMany[String] - checkCollisions[String] - } + // test("Test out Unit") { + // BinaryOrdering.ordSer[Unit] + // check[Unit] + // checkMany[Unit] + // } + // test("Test out Boolean") { + // BinaryOrdering.ordSer[Boolean] + // check[Boolean] + // } + // test("Test out jl.Boolean") { + // implicit val a = arbMap { b: Boolean => + // java.lang.Boolean.valueOf(b) + // } + // check[java.lang.Boolean] + // } + // test("Test out Byte") { check[Byte] } + // test("Test out jl.Byte") { + // implicit val a = arbMap { b: Byte => + // java.lang.Byte.valueOf(b) + // } + // check[java.lang.Byte] + // checkCollisions[java.lang.Byte] + // } + // test("Test out Short") { check[Short] } + // test("Test out jl.Short") { + // implicit val a = arbMap { b: Short => + // java.lang.Short.valueOf(b) + // } + // check[java.lang.Short] + // checkCollisions[java.lang.Short] + // } + // test("Test out Char") { check[Char] } + // test("Test out jl.Char") { + // implicit val a = arbMap { b: Char => + // java.lang.Character.valueOf(b) + // } + // check[java.lang.Character] + // checkCollisions[java.lang.Character] + // } + // test("Test out Int") { + // BinaryOrdering.ordSer[Int] + // check[Int] + // checkMany[Int] + // checkCollisions[Int] + // } + + // test("Test out AnyVal of String") { + // import TestCC._ + // check[TestCaseClassE] + // checkMany[TestCaseClassE] + // checkCollisions[TestCaseClassE] + // } + + // test("Test out Tuple of AnyVal's of String") { + // import TestCC._ + // BinaryOrdering.ordSer[(TestCaseClassE, TestCaseClassE)] + // check[(TestCaseClassE, TestCaseClassE)] + // checkMany[(TestCaseClassE, TestCaseClassE)] + // checkCollisions[(TestCaseClassE, TestCaseClassE)] + // } + + // test("Test out Tuple of TestSealedAbstractClass") { + // import TestCC._ + // BinaryOrdering.ordSer[TestSealedAbstractClass] + // check[TestSealedAbstractClass] + // checkMany[TestSealedAbstractClass] + // checkCollisions[TestSealedAbstractClass] + // } + + // test("Test out jl.Integer") { + // implicit val a = arbMap { b: Int => + // java.lang.Integer.valueOf(b) + // } + // check[java.lang.Integer] + // checkCollisions[java.lang.Integer] + + // } + // test("Test out Float") { check[Float] } + // test("Test out jl.Float") { + // implicit val a = arbMap { b: Float => + // java.lang.Float.valueOf(b) + // } + // check[java.lang.Float] + // checkCollisions[java.lang.Float] + // } + // test("Test out Long") { check[Long] } + // test("Test out jl.Long") { + // implicit val a = arbMap { b: Long => + // java.lang.Long.valueOf(b) + // } + // check[java.lang.Long] + // checkCollisions[java.lang.Long] + // } + // test("Test out Double") { check[Double] } + // test("Test out jl.Double") { + // implicit val a = arbMap { b: Double => + // java.lang.Double.valueOf(b) + // } + // check[java.lang.Double] + // checkCollisions[java.lang.Double] + // } + + // test("Test out String") { + // BinaryOrdering.ordSer[String] + + // check[String] + // checkMany[String] + // checkCollisions[String] + // } test("Test out ByteBuffer") { implicitly[OrderedSerialization[ByteBuffer]] @@ -517,350 +517,350 @@ class MacroOrderingProperties checkCollisions[ByteBuffer] } - test("Test out List[Float]") { - BinaryOrdering.ordSer[List[Float]] - check[List[Float]] - checkCollisions[List[Float]] - } - test("Test out Queue[Int]") { - implicit val isa = collectionArb[Queue, Int] - BinaryOrdering.ordSer[Queue[Int]] - check[Queue[Int]] - checkCollisions[Queue[Int]] - } - test("Test out IndexedSeq[Int]") { - implicit val isa = collectionArb[IndexedSeq, Int] - BinaryOrdering.ordSer[IndexedSeq[Int]] - check[IndexedSeq[Int]] - checkCollisions[IndexedSeq[Int]] - } - test("Test out HashSet[Int]") { - import scala.collection.immutable.HashSet - implicit val isa = collectionArb[HashSet, Int] - BinaryOrdering.ordSer[HashSet[Int]] - check[HashSet[Int]] - checkCollisions[HashSet[Int]] - } - test("Test out ListSet[Int]") { - import scala.collection.immutable.ListSet - implicit val isa = collectionArb[ListSet, Int] - BinaryOrdering.ordSer[ListSet[Int]] - check[ListSet[Int]] - checkCollisions[ListSet[Int]] - } - - test("Test out List[String]") { - BinaryOrdering.ordSer[List[String]] - check[List[String]] - checkCollisions[List[String]] - } - - test("Test out List[List[String]]") { - val oBuf = BinaryOrdering.ordSer[List[List[String]]] - assert(oBuf.dynamicSize(List(List("sdf"))) === None) - check[List[List[String]]] - checkCollisions[List[List[String]]] - } - - test("Test out List[Int]") { - BinaryOrdering.ordSer[List[Int]] - check[List[Int]] - checkCollisions[List[Int]] - } - - test("Test out SetAlias") { - BinaryOrdering.ordSer[SetAlias] - check[SetAlias] - checkCollisions[SetAlias] - } - - test("Container.InnerCaseClass") { - BinaryOrdering.ordSer[Container.InnerCaseClass] - check[Container.InnerCaseClass] - checkCollisions[Container.InnerCaseClass] - } - - test("Test out Seq[Int]") { - BinaryOrdering.ordSer[Seq[Int]] - check[Seq[Int]] - checkCollisions[Seq[Int]] - } - test("Test out scala.collection.Seq[Int]") { - BinaryOrdering.ordSer[scala.collection.Seq[Int]] - check[scala.collection.Seq[Int]] - checkCollisions[scala.collection.Seq[Int]] - } - - test("Test out Array[Byte]") { - BinaryOrdering.ordSer[Array[Byte]] - check[Array[Byte]] - checkCollisions[Array[Byte]] - } - - test("Test out Vector[Int]") { - BinaryOrdering.ordSer[Vector[Int]] - check[Vector[Int]] - checkCollisions[Vector[Int]] - } - - test("Test out Iterable[Int]") { - BinaryOrdering.ordSer[Iterable[Int]] - check[Iterable[Int]] - checkCollisions[Iterable[Int]] - } - - test("Test out Set[Int]") { - BinaryOrdering.ordSer[Set[Int]] - check[Set[Int]] - checkCollisions[Set[Int]] - } - - test("Test out Set[Double]") { - BinaryOrdering.ordSer[Set[Double]] - check[Set[Double]] - checkCollisions[Set[Double]] - } - - test("Test out Map[Long, Set[Int]]") { - BinaryOrdering.ordSer[Map[Long, Set[Int]]] - check[Map[Long, Set[Int]]] - val c = List(Map(9223372036854775807L -> Set[Int]()), Map(-1L -> Set[Int](-2043106012))) - checkManyExplicit(c.map { i => - (i, i) - }) - checkMany[Map[Long, Set[Int]]] - checkCollisions[Map[Long, Set[Int]]] - } - - test("Test out Map[Long, Long]") { - BinaryOrdering.ordSer[Map[Long, Long]] - check[Map[Long, Long]] - checkCollisions[Map[Long, Long]] - } - test("Test out HashMap[Long, Long]") { - import scala.collection.immutable.HashMap - implicit val isa = - Arbitrary(implicitly[Arbitrary[List[(Long, Long)]]].arbitrary.map(HashMap(_: _*))) - BinaryOrdering.ordSer[HashMap[Long, Long]] - check[HashMap[Long, Long]] - checkCollisions[HashMap[Long, Long]] - } - test("Test out ListMap[Long, Long]") { - import scala.collection.immutable.ListMap - implicit val isa = - Arbitrary(implicitly[Arbitrary[List[(Long, Long)]]].arbitrary.map(ListMap(_: _*))) - BinaryOrdering.ordSer[ListMap[Long, Long]] - check[ListMap[Long, Long]] - checkCollisions[ListMap[Long, Long]] - } - - test("Test out comparing Maps(3->2, 2->3) and Maps(2->3, 3->2) ") { - val a = Map(3 -> 2, 2 -> 3) - val b = Map(2 -> 3, 3 -> 2) - checkWithInputs(a, b) - checkAreSame(a, b) - } - - test("Test out comparing Set(\"asdf\", \"jkl\") and Set(\"jkl\", \"asdf\")") { - val a = Set("asdf", "jkl") - val b = Set("jkl", "asdf") - checkWithInputs(a, b) - checkAreSame(a, b) - } - - test("Test known hard String Case") { - val a = "6" - val b = "곆" - val ord = Ordering.String - assert(rawCompare(a, b) === ord.compare(a, b).signum, "Raw and in memory compares match.") - - val c = List( - "榴㉕⊟풠湜ᙬ覹ꜻ裧뚐⠂覝쫨塢䇺楠谭픚ᐌ轮뺷Ⱟ洦擄黏著탅ﮓꆋ숷梸傠ァ蹵窥轲闇涡飽ꌳ䝞慙擃", - "堒凳媨쉏떽㶥⾽샣井ㆠᇗ裉깴辫࠷᤭塈䎙寫㸉ᶴ䰄똇䡷䥞㷗䷱赫懓䷏剆祲ᝯ졑쐯헢鷴ӕ秔㽰ퟡ㏉鶖奚㙰银䮌ᕗ膾买씋썴행䣈丶偝쾕鐗쇊ኋ넥︇瞤䋗噯邧⹆♣ἷ铆玼⪷沕辤ᠥ⥰箼䔄◗", - "騰쓢堷뛭ᣣﰩ嚲ﲯ㤑ᐜ檊೦⠩奯ᓩ윇롇러ᕰెꡩ璞﫼᭵礀閮䈦椄뾪ɔ믻䖔᪆嬽フ鶬曭꣍ᆏ灖㐸뗋ㆃ녵ퟸ겵晬礙㇩䫓ᘞ昑싨", - "좃ఱ䨻綛糔唄࿁劸酊᫵橻쩳괊筆ݓ淤숪輡斋靑耜঄骐冠㝑⧠떅漫곡祈䵾ᳺ줵됵↲搸虂㔢Ꝅ芆٠풐쮋炞哙⨗쾄톄멛癔짍避쇜畾㣕剼⫁়╢ꅢ澛氌ᄚ㍠ꃫᛔ匙㜗詇閦單錖⒅瘧崥", - "獌癚畇") - checkManyExplicit(c.map { i => - (i, i) - }) - - val c2 = List("聸", "") - checkManyExplicit(c2.map { i => - (i, i) - }) - } - - test("Test out Option[Int]") { - val oser = BinaryOrdering.ordSer[Option[Int]] - - assert(oser.staticSize === None, "can't get the size statically") - check[Option[Int]] - checkMany[Option[Int]] - checkCollisions[Option[Int]] - } - - test("Test out Option[String]") { - BinaryOrdering.ordSer[Option[String]] - - check[Option[String]] - checkMany[Option[String]] - checkCollisions[Option[String]] - } - - test("Test Either[Int, Option[Int]]") { - val oser = BinaryOrdering.ordSer[Either[Int, Option[Int]]] - assert(oser.staticSize === None, "can't get the size statically") - check[Either[Int, Option[Int]]] - checkCollisions[Either[Int, Option[Int]]] - } - test("Test Either[Int, String]") { - val oser = BinaryOrdering.ordSer[Either[Int, String]] - assert(oser.staticSize === None, "can't get the size statically") - assert( - Some(Serialization.toBytes[Either[Int, String]](Left(1)).length) === oser.dynamicSize( - Left(1)), - "serialization size matches dynamic size") - check[Either[Int, String]] - checkCollisions[Either[Int, String]] - } - test("Test Either[Int, Int]") { - val oser = BinaryOrdering.ordSer[Either[Int, Int]] - assert(oser.staticSize === Some(5), "can get the size statically") - check[Either[Int, Int]] - checkCollisions[Either[Int, Int]] - } - test("Test Either[String, Int]") { - BinaryOrdering.ordSer[Either[String, Int]] - check[Either[String, Int]] - checkCollisions[Either[String, Int]] - } - test("Test Either[String, String]") { - BinaryOrdering.ordSer[Either[String, String]] - check[Either[String, String]] - checkCollisions[Either[String, String]] - } - - test("Test out Option[Option[Int]]") { - BinaryOrdering.ordSer[Option[Option[Int]]] - - check[Option[Option[Int]]] - checkCollisions[Option[Option[Int]]] - } - - test("test product like TestCC") { - checkMany[(Int, Char, Long, Option[Int], Double, Option[String])] - checkCollisions[(Int, Char, Long, Option[Int], Double, Option[String])] - } - - test("test specific tuple aa1") { - BinaryOrdering.ordSer[(String, Option[Int], String)] - - checkMany[(String, Option[Int], String)] - checkCollisions[(String, Option[Int], String)] - } - - test("test specific tuple 2") { - check[(String, Option[Int], String)] - checkCollisions[(String, Option[Int], String)] - } - - test("test specific tuple 3") { - val c = List( - ("", None, ""), - ("a", Some(1), "b")) - checkManyExplicit(c.map { i => - (i, i) - }) - } - - test("Test out TestCC") { - import TestCC._ - BinaryOrdering.ordSer[TestCC] - check[TestCC] - checkMany[TestCC] - checkCollisions[TestCC] - } - - test("Test out Sealed Trait") { - import TestCC._ - BinaryOrdering.ordSer[SealedTraitTest] - check[SealedTraitTest] - checkMany[SealedTraitTest] - checkCollisions[SealedTraitTest] - } - - test("Test out Sealed TestCaseHardA") { - import TestCC._ - BinaryOrdering.ordSer[TestCaseHardA] - check[TestCaseHardA] - checkMany[TestCaseHardA] - checkCollisions[TestCaseHardA] - } - - test("Test out Sealed TestCaseHardB") { - import TestCC._ - - implicit val v: OrderedSerialization[ContainerP] = - OrderedSerialization.viaTransform(_.id, ContainerP.fromId) - - BinaryOrdering.ordSer[TestCaseHardB] - check[TestCaseHardB] - checkMany[TestCaseHardB] - checkCollisions[TestCaseHardB] - } - - test("Test out CaseObject") { - import TestCC._ - BinaryOrdering.ordSer[TestObjectE.type] - check[TestObjectE.type] - checkMany[TestObjectE.type] - } - - test("Test out (Int, Int)") { - BinaryOrdering.ordSer[(Int, Int)] - check[(Int, Int)] - checkCollisions[(Int, Int)] - } - - test("Test out (String, Option[Int], String)") { - BinaryOrdering.ordSer[(String, Option[Int], String)] - check[(String, Option[Int], String)] - checkCollisions[(String, Option[Int], String)] - } - - test("Test out MyData") { - import MyData._ - BinaryOrdering.ordSer[MyData] - check[MyData] - checkCollisions[MyData] - } - - test("Test out MacroOpaqueContainer") { - // This will test for things which our macros can't view themselves, so need to use an implicit to let the user provide instead. - // by itself should just work from its own implicits - implicitly[OrderedSerialization[MacroOpaqueContainer]] - - // Put inside a tuple2 to test that - BinaryOrdering.ordSer[(MacroOpaqueContainer, MacroOpaqueContainer)] - check[(MacroOpaqueContainer, MacroOpaqueContainer)] - checkCollisions[(MacroOpaqueContainer, MacroOpaqueContainer)] - check[Option[MacroOpaqueContainer]] - checkCollisions[Option[MacroOpaqueContainer]] - check[List[MacroOpaqueContainer]] - checkCollisions[List[MacroOpaqueContainer]] - } - - test("Does not produce ordering for large sealed trait") { - noOrderedSerialization[BigTrait] - } - - def fn[A]( - implicit or: OrderedSerialization[A]): OrderedSerialization[TypedParameterCaseClass[A]] = - BinaryOrdering.ordSer[TypedParameterCaseClass[A]] - - test("Test out MacroOpaqueContainer inside a case class as an abstract type") { - fn[MacroOpaqueContainer] - BinaryOrdering.ordSer[(MacroOpaqueContainer, MacroOpaqueContainer)] - () - } + // test("Test out List[Float]") { + // BinaryOrdering.ordSer[List[Float]] + // check[List[Float]] + // checkCollisions[List[Float]] + // } + // test("Test out Queue[Int]") { + // implicit val isa = collectionArb[Queue, Int] + // BinaryOrdering.ordSer[Queue[Int]] + // check[Queue[Int]] + // checkCollisions[Queue[Int]] + // } + // test("Test out IndexedSeq[Int]") { + // implicit val isa = collectionArb[IndexedSeq, Int] + // BinaryOrdering.ordSer[IndexedSeq[Int]] + // check[IndexedSeq[Int]] + // checkCollisions[IndexedSeq[Int]] + // } + // test("Test out HashSet[Int]") { + // import scala.collection.immutable.HashSet + // implicit val isa = collectionArb[HashSet, Int] + // BinaryOrdering.ordSer[HashSet[Int]] + // check[HashSet[Int]] + // checkCollisions[HashSet[Int]] + // } + // test("Test out ListSet[Int]") { + // import scala.collection.immutable.ListSet + // implicit val isa = collectionArb[ListSet, Int] + // BinaryOrdering.ordSer[ListSet[Int]] + // check[ListSet[Int]] + // checkCollisions[ListSet[Int]] + // } + + // test("Test out List[String]") { + // BinaryOrdering.ordSer[List[String]] + // check[List[String]] + // checkCollisions[List[String]] + // } + + // test("Test out List[List[String]]") { + // val oBuf = BinaryOrdering.ordSer[List[List[String]]] + // assert(oBuf.dynamicSize(List(List("sdf"))) === None) + // check[List[List[String]]] + // checkCollisions[List[List[String]]] + // } + + // test("Test out List[Int]") { + // BinaryOrdering.ordSer[List[Int]] + // check[List[Int]] + // checkCollisions[List[Int]] + // } + + // test("Test out SetAlias") { + // BinaryOrdering.ordSer[SetAlias] + // check[SetAlias] + // checkCollisions[SetAlias] + // } + + // test("Container.InnerCaseClass") { + // BinaryOrdering.ordSer[Container.InnerCaseClass] + // check[Container.InnerCaseClass] + // checkCollisions[Container.InnerCaseClass] + // } + + // test("Test out Seq[Int]") { + // BinaryOrdering.ordSer[Seq[Int]] + // check[Seq[Int]] + // checkCollisions[Seq[Int]] + // } + // test("Test out scala.collection.Seq[Int]") { + // BinaryOrdering.ordSer[scala.collection.Seq[Int]] + // check[scala.collection.Seq[Int]] + // checkCollisions[scala.collection.Seq[Int]] + // } + + // test("Test out Array[Byte]") { + // BinaryOrdering.ordSer[Array[Byte]] + // check[Array[Byte]] + // checkCollisions[Array[Byte]] + // } + + // test("Test out Vector[Int]") { + // BinaryOrdering.ordSer[Vector[Int]] + // check[Vector[Int]] + // checkCollisions[Vector[Int]] + // } + + // test("Test out Iterable[Int]") { + // BinaryOrdering.ordSer[Iterable[Int]] + // check[Iterable[Int]] + // checkCollisions[Iterable[Int]] + // } + + // test("Test out Set[Int]") { + // BinaryOrdering.ordSer[Set[Int]] + // check[Set[Int]] + // checkCollisions[Set[Int]] + // } + + // test("Test out Set[Double]") { + // BinaryOrdering.ordSer[Set[Double]] + // check[Set[Double]] + // checkCollisions[Set[Double]] + // } + + // test("Test out Map[Long, Set[Int]]") { + // BinaryOrdering.ordSer[Map[Long, Set[Int]]] + // check[Map[Long, Set[Int]]] + // val c = List(Map(9223372036854775807L -> Set[Int]()), Map(-1L -> Set[Int](-2043106012))) + // checkManyExplicit(c.map { i => + // (i, i) + // }) + // checkMany[Map[Long, Set[Int]]] + // checkCollisions[Map[Long, Set[Int]]] + // } + + // test("Test out Map[Long, Long]") { + // BinaryOrdering.ordSer[Map[Long, Long]] + // check[Map[Long, Long]] + // checkCollisions[Map[Long, Long]] + // } + // test("Test out HashMap[Long, Long]") { + // import scala.collection.immutable.HashMap + // implicit val isa = + // Arbitrary(implicitly[Arbitrary[List[(Long, Long)]]].arbitrary.map(HashMap(_: _*))) + // BinaryOrdering.ordSer[HashMap[Long, Long]] + // check[HashMap[Long, Long]] + // checkCollisions[HashMap[Long, Long]] + // } + // test("Test out ListMap[Long, Long]") { + // import scala.collection.immutable.ListMap + // implicit val isa = + // Arbitrary(implicitly[Arbitrary[List[(Long, Long)]]].arbitrary.map(ListMap(_: _*))) + // BinaryOrdering.ordSer[ListMap[Long, Long]] + // check[ListMap[Long, Long]] + // checkCollisions[ListMap[Long, Long]] + // } + + // test("Test out comparing Maps(3->2, 2->3) and Maps(2->3, 3->2) ") { + // val a = Map(3 -> 2, 2 -> 3) + // val b = Map(2 -> 3, 3 -> 2) + // checkWithInputs(a, b) + // checkAreSame(a, b) + // } + + // test("Test out comparing Set(\"asdf\", \"jkl\") and Set(\"jkl\", \"asdf\")") { + // val a = Set("asdf", "jkl") + // val b = Set("jkl", "asdf") + // checkWithInputs(a, b) + // checkAreSame(a, b) + // } + + // test("Test known hard String Case") { + // val a = "6" + // val b = "곆" + // val ord = Ordering.String + // assert(rawCompare(a, b) === ord.compare(a, b).signum, "Raw and in memory compares match.") + + // val c = List( + // "榴㉕⊟풠湜ᙬ覹ꜻ裧뚐⠂覝쫨塢䇺楠谭픚ᐌ轮뺷Ⱟ洦擄黏著탅ﮓꆋ숷梸傠ァ蹵窥轲闇涡飽ꌳ䝞慙擃", + // "堒凳媨쉏떽㶥⾽샣井ㆠᇗ裉깴辫࠷᤭塈䎙寫㸉ᶴ䰄똇䡷䥞㷗䷱赫懓䷏剆祲ᝯ졑쐯헢鷴ӕ秔㽰ퟡ㏉鶖奚㙰银䮌ᕗ膾买씋썴행䣈丶偝쾕鐗쇊ኋ넥︇瞤䋗噯邧⹆♣ἷ铆玼⪷沕辤ᠥ⥰箼䔄◗", + // "騰쓢堷뛭ᣣﰩ嚲ﲯ㤑ᐜ檊೦⠩奯ᓩ윇롇러ᕰెꡩ璞﫼᭵礀閮䈦椄뾪ɔ믻䖔᪆嬽フ鶬曭꣍ᆏ灖㐸뗋ㆃ녵ퟸ겵晬礙㇩䫓ᘞ昑싨", + // "좃ఱ䨻綛糔唄࿁劸酊᫵橻쩳괊筆ݓ淤숪輡斋靑耜঄骐冠㝑⧠떅漫곡祈䵾ᳺ줵됵↲搸虂㔢Ꝅ芆٠풐쮋炞哙⨗쾄톄멛癔짍避쇜畾㣕剼⫁়╢ꅢ澛氌ᄚ㍠ꃫᛔ匙㜗詇閦單錖⒅瘧崥", + // "獌癚畇") + // checkManyExplicit(c.map { i => + // (i, i) + // }) + + // val c2 = List("聸", "") + // checkManyExplicit(c2.map { i => + // (i, i) + // }) + // } + + // test("Test out Option[Int]") { + // val oser = BinaryOrdering.ordSer[Option[Int]] + + // assert(oser.staticSize === None, "can't get the size statically") + // check[Option[Int]] + // checkMany[Option[Int]] + // checkCollisions[Option[Int]] + // } + + // test("Test out Option[String]") { + // BinaryOrdering.ordSer[Option[String]] + + // check[Option[String]] + // checkMany[Option[String]] + // checkCollisions[Option[String]] + // } + + // test("Test Either[Int, Option[Int]]") { + // val oser = BinaryOrdering.ordSer[Either[Int, Option[Int]]] + // assert(oser.staticSize === None, "can't get the size statically") + // check[Either[Int, Option[Int]]] + // checkCollisions[Either[Int, Option[Int]]] + // } + // test("Test Either[Int, String]") { + // val oser = BinaryOrdering.ordSer[Either[Int, String]] + // assert(oser.staticSize === None, "can't get the size statically") + // assert( + // Some(Serialization.toBytes[Either[Int, String]](Left(1)).length) === oser.dynamicSize( + // Left(1)), + // "serialization size matches dynamic size") + // check[Either[Int, String]] + // checkCollisions[Either[Int, String]] + // } + // test("Test Either[Int, Int]") { + // val oser = BinaryOrdering.ordSer[Either[Int, Int]] + // assert(oser.staticSize === Some(5), "can get the size statically") + // check[Either[Int, Int]] + // checkCollisions[Either[Int, Int]] + // } + // test("Test Either[String, Int]") { + // BinaryOrdering.ordSer[Either[String, Int]] + // check[Either[String, Int]] + // checkCollisions[Either[String, Int]] + // } + // test("Test Either[String, String]") { + // BinaryOrdering.ordSer[Either[String, String]] + // check[Either[String, String]] + // checkCollisions[Either[String, String]] + // } + + // test("Test out Option[Option[Int]]") { + // BinaryOrdering.ordSer[Option[Option[Int]]] + + // check[Option[Option[Int]]] + // checkCollisions[Option[Option[Int]]] + // } + + // test("test product like TestCC") { + // checkMany[(Int, Char, Long, Option[Int], Double, Option[String])] + // checkCollisions[(Int, Char, Long, Option[Int], Double, Option[String])] + // } + + // test("test specific tuple aa1") { + // BinaryOrdering.ordSer[(String, Option[Int], String)] + + // checkMany[(String, Option[Int], String)] + // checkCollisions[(String, Option[Int], String)] + // } + + // test("test specific tuple 2") { + // check[(String, Option[Int], String)] + // checkCollisions[(String, Option[Int], String)] + // } + + // test("test specific tuple 3") { + // val c = List( + // ("", None, ""), + // ("a", Some(1), "b")) + // checkManyExplicit(c.map { i => + // (i, i) + // }) + // } + + // test("Test out TestCC") { + // import TestCC._ + // BinaryOrdering.ordSer[TestCC] + // check[TestCC] + // checkMany[TestCC] + // checkCollisions[TestCC] + // } + + // test("Test out Sealed Trait") { + // import TestCC._ + // BinaryOrdering.ordSer[SealedTraitTest] + // check[SealedTraitTest] + // checkMany[SealedTraitTest] + // checkCollisions[SealedTraitTest] + // } + + // test("Test out Sealed TestCaseHardA") { + // import TestCC._ + // BinaryOrdering.ordSer[TestCaseHardA] + // check[TestCaseHardA] + // checkMany[TestCaseHardA] + // checkCollisions[TestCaseHardA] + // } + + // test("Test out Sealed TestCaseHardB") { + // import TestCC._ + + // implicit val v: OrderedSerialization[ContainerP] = + // OrderedSerialization.viaTransform(_.id, ContainerP.fromId) + + // BinaryOrdering.ordSer[TestCaseHardB] + // check[TestCaseHardB] + // checkMany[TestCaseHardB] + // checkCollisions[TestCaseHardB] + // } + + // test("Test out CaseObject") { + // import TestCC._ + // BinaryOrdering.ordSer[TestObjectE.type] + // check[TestObjectE.type] + // checkMany[TestObjectE.type] + // } + + // test("Test out (Int, Int)") { + // BinaryOrdering.ordSer[(Int, Int)] + // check[(Int, Int)] + // checkCollisions[(Int, Int)] + // } + + // test("Test out (String, Option[Int], String)") { + // BinaryOrdering.ordSer[(String, Option[Int], String)] + // check[(String, Option[Int], String)] + // checkCollisions[(String, Option[Int], String)] + // } + + // test("Test out MyData") { + // import MyData._ + // BinaryOrdering.ordSer[MyData] + // check[MyData] + // checkCollisions[MyData] + // } + + // test("Test out MacroOpaqueContainer") { + // // This will test for things which our macros can't view themselves, so need to use an implicit to let the user provide instead. + // // by itself should just work from its own implicits + // implicitly[OrderedSerialization[MacroOpaqueContainer]] + + // // Put inside a tuple2 to test that + // BinaryOrdering.ordSer[(MacroOpaqueContainer, MacroOpaqueContainer)] + // check[(MacroOpaqueContainer, MacroOpaqueContainer)] + // checkCollisions[(MacroOpaqueContainer, MacroOpaqueContainer)] + // check[Option[MacroOpaqueContainer]] + // checkCollisions[Option[MacroOpaqueContainer]] + // check[List[MacroOpaqueContainer]] + // checkCollisions[List[MacroOpaqueContainer]] + // } + + // test("Does not produce ordering for large sealed trait") { + // noOrderedSerialization[BigTrait] + // } + + // def fn[A]( + // implicit or: OrderedSerialization[A]): OrderedSerialization[TypedParameterCaseClass[A]] = + // BinaryOrdering.ordSer[TypedParameterCaseClass[A]] + + // test("Test out MacroOpaqueContainer inside a case class as an abstract type") { + // fn[MacroOpaqueContainer] + // BinaryOrdering.ordSer[(MacroOpaqueContainer, MacroOpaqueContainer)] + // () + // } } From fa386af10b57e3842230623d5aacc757efbe80f6 Mon Sep 17 00:00:00 2001 From: ianoc Date: Tue, 10 Oct 2017 20:53:30 -0700 Subject: [PATCH 6/6] Add either ordered buf --- .../impl/OrderedBufferableProviderImpl.scala | 2 - .../providers/EitherOrderedBuf.scala | 189 ------------------ .../providers/SealedTraitOrderedBuf.scala | 2 +- .../provided/UnsafeCompareBinary.scala | 21 +- .../serialization/provided/package.scala | 1 + .../macros/MacroOrderingProperties.scala | 12 +- 6 files changed, 27 insertions(+), 200 deletions(-) delete mode 100644 scalding-serialization/src/main/scala/com/twitter/scalding/serialization/macros/impl/ordered_serialization/providers/EitherOrderedBuf.scala diff --git a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/macros/impl/OrderedBufferableProviderImpl.scala b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/macros/impl/OrderedBufferableProviderImpl.scala index 4b698d5939..74e7dc7652 100644 --- a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/macros/impl/OrderedBufferableProviderImpl.scala +++ b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/macros/impl/OrderedBufferableProviderImpl.scala @@ -34,7 +34,6 @@ object OrderedSerializationProviderImpl { val primitiveDispatcher = PrimitiveOrderedBuf.dispatch(c) val optionDispatcher = OptionOrderedBuf.dispatch(c)(buildDispatcher) - val eitherDispatcher = EitherOrderedBuf.dispatch(c)(buildDispatcher) val caseClassDispatcher = CaseClassOrderedBuf.dispatch(c)(buildDispatcher) val caseObjectDispatcher = CaseObjectOrderedBuf.dispatch(c) val productDispatcher = ProductOrderedBuf.dispatch(c)(buildDispatcher) @@ -48,7 +47,6 @@ object OrderedSerializationProviderImpl { .orElse(primitiveDispatcher) .orElse(unitDispatcher) .orElse(optionDispatcher) - .orElse(eitherDispatcher) .orElse(stringDispatcher) .orElse(traversablesDispatcher) .orElse(caseClassDispatcher) diff --git a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/macros/impl/ordered_serialization/providers/EitherOrderedBuf.scala b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/macros/impl/ordered_serialization/providers/EitherOrderedBuf.scala deleted file mode 100644 index af564142c9..0000000000 --- a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/macros/impl/ordered_serialization/providers/EitherOrderedBuf.scala +++ /dev/null @@ -1,189 +0,0 @@ -/* - Copyright 2015 Twitter, Inc. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - */ -package com.twitter.scalding.serialization.macros.impl.ordered_serialization.providers - -import scala.language.experimental.macros -import scala.reflect.macros.blackbox.Context - -import com.twitter.scalding._ -import com.twitter.scalding.serialization.macros.impl.ordered_serialization.{ - CompileTimeLengthTypes, - ProductLike, - TreeOrderedBuf -} -import CompileTimeLengthTypes._ -import com.twitter.scalding.serialization.OrderedSerialization - -object EitherOrderedBuf { - def dispatch(c: Context)(buildDispatcher: => PartialFunction[c.Type, TreeOrderedBuf[c.type]]): PartialFunction[c.Type, TreeOrderedBuf[c.type]] = { - case tpe if tpe.erasure =:= c.universe.typeOf[Either[Any, Any]] => - EitherOrderedBuf(c)(buildDispatcher, tpe) - } - - def apply(c: Context)(buildDispatcher: => PartialFunction[c.Type, TreeOrderedBuf[c.type]], - outerType: c.Type): TreeOrderedBuf[c.type] = { - import c.universe._ - def freshT(id: String) = TermName(c.freshName(id)) - val dispatcher = buildDispatcher - - val leftType = outerType.asInstanceOf[TypeRefApi].args(0) // linter:ignore - val rightType = outerType.asInstanceOf[TypeRefApi].args(1) - val leftBuf: TreeOrderedBuf[c.type] = dispatcher(leftType) - val rightBuf: TreeOrderedBuf[c.type] = dispatcher(rightType) - - def genBinaryCompare(inputStreamA: TermName, inputStreamB: TermName) = { - val valueOfA = freshT("valueOfA") - val valueOfB = freshT("valueOfB") - val tmpHolder = freshT("tmpHolder") - q""" - val $valueOfA = $inputStreamA.readByte - val $valueOfB = $inputStreamB.readByte - val $tmpHolder = _root_.java.lang.Byte.compare($valueOfA, $valueOfB) - if($tmpHolder != 0) { - //they are different, return comparison on type - $tmpHolder - } else if($valueOfA == (0: _root_.scala.Byte)) { - // they are both Left: - ${leftBuf.compareBinary(inputStreamA, inputStreamB)} - } else { - // they are both Right: - ${rightBuf.compareBinary(inputStreamA, inputStreamB)} - } - """ - } - - def genHashFn(element: TermName) = { - val innerValue = freshT("innerValue") - q""" - if($element.isLeft) { - val $innerValue = $element.left.get - val x = ${leftBuf.hash(innerValue)} - // x * (2^31 - 1) which is a mersenne prime - (x << 31) - x - } - else { - val $innerValue = $element.right.get - // x * (2^19 - 1) which is a mersenne prime - val x = ${rightBuf.hash(innerValue)} - (x << 19) - x - } - """ - } - - def genGetFn(inputStreamA: TermName) = { - val tmpGetHolder = freshT("tmpGetHolder") - q""" - val $tmpGetHolder = $inputStreamA.readByte - if($tmpGetHolder == (0: _root_.scala.Byte)) Left(${leftBuf.get(inputStreamA)}) - else Right(${rightBuf.get(inputStreamA)}) - """ - } - - def genPutFn(inputStream: TermName, element: TermName) = { - val tmpPutVal = freshT("tmpPutVal") - val innerValue = freshT("innerValue") - q""" - if($element.isRight) { - $inputStream.writeByte(1: _root_.scala.Byte) - val $innerValue = $element.right.get - ${rightBuf.put(inputStream, innerValue)} - } else { - $inputStream.writeByte(0: _root_.scala.Byte) - val $innerValue = $element.left.get - ${leftBuf.put(inputStream, innerValue)} - } - """ - } - - def genCompareFn(elementA: TermName, elementB: TermName) = { - val aIsRight = freshT("aIsRight") - val bIsRight = freshT("bIsRight") - val innerValueA = freshT("innerValueA") - val innerValueB = freshT("innerValueB") - q""" - val $aIsRight = $elementA.isRight - val $bIsRight = $elementB.isRight - if(!$aIsRight) { - if (!$bIsRight) { - val $innerValueA = $elementA.left.get - val $innerValueB = $elementB.left.get - ${leftBuf.compare(innerValueA, innerValueB)} - } - else -1 // Left(_) < Right(_) - } - else { - if(!$bIsRight) 1 // Right(_) > Left(_) - else { // both are right - val $innerValueA = $elementA.right.get - val $innerValueB = $elementB.right.get - ${rightBuf.compare(innerValueA, innerValueB)} - } - } - """ - } - - new TreeOrderedBuf[c.type] { - override val ctx: c.type = c - override val tpe = outerType - override def compareBinary(inputStreamA: TermName, inputStreamB: TermName) = - genBinaryCompare(inputStreamA, inputStreamB) - override def hash(element: TermName): ctx.Tree = genHashFn(element) - override def put(inputStream: TermName, element: TermName) = genPutFn(inputStream, element) - override def get(inputStreamA: TermName): ctx.Tree = genGetFn(inputStreamA) - override def compare(elementA: TermName, elementB: TermName): ctx.Tree = - genCompareFn(elementA, elementB) - override val lazyOuterVariables: Map[String, ctx.Tree] = - rightBuf.lazyOuterVariables ++ leftBuf.lazyOuterVariables - override def length(element: Tree): CompileTimeLengthTypes[c.type] = { - - def tree(ctl: CompileTimeLengthTypes[_]): c.Tree = - ctl.asInstanceOf[CompileTimeLengthTypes[c.type]].t - val dyn = - q"""_root_.com.twitter.scalding.serialization.macros.impl.ordered_serialization.runtime_helpers.DynamicLen""" - - (leftBuf.length(q"$element.left.get"), rightBuf.length(q"$element.right.get")) match { - case (lconst: ConstantLengthCalculation[_], rconst: ConstantLengthCalculation[_]) if lconst.toInt == rconst.toInt => - // We got lucky, they are the same size: - ConstantLengthCalculation(c)(1 + rconst.toInt) - case (_: NoLengthCalculationAvailable[_], _) => NoLengthCalculationAvailable(c) - case (_, _: NoLengthCalculationAvailable[_]) => NoLengthCalculationAvailable(c) - case (left: MaybeLengthCalculation[_], right: MaybeLengthCalculation[_]) => - MaybeLengthCalculation(c)(q""" - if ($element.isLeft) { ${tree(left)} + $dyn(1) } - else { ${tree(right)} + $dyn(1) } - """) - case (left: MaybeLengthCalculation[_], right) => - MaybeLengthCalculation(c)(q""" - if ($element.isLeft) { ${tree(left)} + $dyn(1) } - else { $dyn(${tree(right)}) + $dyn(1) } - """) - case (left, right: MaybeLengthCalculation[_]) => - MaybeLengthCalculation(c)(q""" - if ($element.isLeft) { $dyn(${tree(left)}) + $dyn(1) } - else { ${tree(right)} + $dyn(1) } - """) - // Rest are constant, but different values or fast. So the result is fast - case (left, right) => - // They are different sizes. :( - FastLengthCalculation(c)(q""" - if($element.isLeft) { 1 + ${tree(left)} } - else { 1 + ${tree(right)} } - """) - } - } - } - } -} diff --git a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/macros/impl/ordered_serialization/providers/SealedTraitOrderedBuf.scala b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/macros/impl/ordered_serialization/providers/SealedTraitOrderedBuf.scala index 42f589d7d5..72a0056c74 100644 --- a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/macros/impl/ordered_serialization/providers/SealedTraitOrderedBuf.scala +++ b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/macros/impl/ordered_serialization/providers/SealedTraitOrderedBuf.scala @@ -25,7 +25,7 @@ object SealedTraitOrderedBuf { import c.universe._ val pf: PartialFunction[c.Type, TreeOrderedBuf[c.type]] = { - case tpe if (tpe.typeSymbol.isClass && (tpe.typeSymbol.asClass.isAbstractClass || tpe.typeSymbol.asClass.isTrait)) => + case tpe if (tpe.typeSymbol.isClass && (tpe.typeSymbol.asClass.isAbstractClass || tpe.typeSymbol.asClass.isTrait)) && !(tpe.erasure =:= c.universe.typeOf[Either[Any, Any]]) => SealedTraitOrderedBuf(c)(buildDispatcher, tpe) } pf diff --git a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/provided/UnsafeCompareBinary.scala b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/provided/UnsafeCompareBinary.scala index c444edf54d..6d85bb12df 100644 --- a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/provided/UnsafeCompareBinary.scala +++ b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/provided/UnsafeCompareBinary.scala @@ -22,15 +22,32 @@ import scala.util.{ Failure, Success, Try } import scala.util.control.NonFatal import com.twitter.scalding.serialization._ +object UnsafeOrderedSerialization { + def apply[T](ordSer: OrderedSerialization[T]): UnsafeOrderedSerialization[T] = ordSer match { + case us: UnsafeOrderedSerialization[T] => us + case o => + new UnsafeOrderedSerialization[T] { + def unsafeWrite(out: OutputStream, t: T): Unit = ordSer.write(out, t).get + def unsafeRead(in: InputStream): T = ordSer.read(in).get + def unsafeSwitchingCompareBinaryNoConsume(inputStreamA: InputStream, inputStreamB: InputStream, shouldConsume: Boolean): Int = + ordSer.compareBinary(inputStreamA, inputStreamB).unsafeToInt + def hash(t: T) = ordSer.hash(t) + def compare(a: T, b: T) = ordSer.compare(a, b) + @inline def staticSize: Option[Int] = ordSer.staticSize + def dynamicSize(t: T): Option[Int] = ordSer.dynamicSize(t) + } + } +} + abstract class UnsafeOrderedSerialization[T] extends OrderedSerialization[T] { // This will write out the interior data as a blob with no prepended length // This means binary compare cannot skip on this data. // However the contract remains that one should be able to _read_ the data // back out again. - def unsafeWrite(out: java.io.OutputStream, t: T): Unit + def unsafeWrite(out: OutputStream, t: T): Unit // This is an internal read method that matches the unsafe write // importantly any outer length header added to enable skipping isn't included here - def unsafeRead(in: java.io.InputStream): T + def unsafeRead(in: InputStream): T // This is an inner binary compare that the user must supply def unsafeSwitchingCompareBinaryNoConsume(inputStreamA: InputStream, inputStreamB: InputStream, shouldConsume: Boolean): Int diff --git a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/provided/package.scala b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/provided/package.scala index 529edceb10..72431bca0c 100644 --- a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/provided/package.scala +++ b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/provided/package.scala @@ -6,4 +6,5 @@ import com.twitter.scalding.serialization.provided.{ OrderedSerializationByteBuf package object provided { implicit val byteBufferOrderedSerialization: Exported[OrderedSerialization[ByteBuffer]] = Exported(OrderedSerializationByteBuffer) + implicit def eitherOrderedOrderedSerialization[L: OrderedSerialization, R: OrderedSerialization]: Exported[OrderedSerialization[Either[L, R]]] = Exported(EitherOrderedSerialization()) } diff --git a/scalding-serialization/src/test/scala/com/twitter/scalding/serialization/macros/MacroOrderingProperties.scala b/scalding-serialization/src/test/scala/com/twitter/scalding/serialization/macros/MacroOrderingProperties.scala index 003007bfe7..e8c93353c6 100644 --- a/scalding-serialization/src/test/scala/com/twitter/scalding/serialization/macros/MacroOrderingProperties.scala +++ b/scalding-serialization/src/test/scala/com/twitter/scalding/serialization/macros/MacroOrderingProperties.scala @@ -723,12 +723,12 @@ class MacroOrderingProperties // check[Either[Int, String]] // checkCollisions[Either[Int, String]] // } - // test("Test Either[Int, Int]") { - // val oser = BinaryOrdering.ordSer[Either[Int, Int]] - // assert(oser.staticSize === Some(5), "can get the size statically") - // check[Either[Int, Int]] - // checkCollisions[Either[Int, Int]] - // } + test("Test Either[Int, Int]") { + val oser = implicitly[OrderedSerialization[Either[Int, Int]]] + assert(oser.staticSize === Some(5), "can get the size statically") + check[Either[Int, Int]] + checkCollisions[Either[Int, Int]] + } // test("Test Either[String, Int]") { // BinaryOrdering.ordSer[Either[String, Int]] // check[Either[String, Int]]