From e90edc18a1d25e9dffa94a1909cebeebd4058dd0 Mon Sep 17 00:00:00 2001 From: Joe Nievelt Date: Thu, 14 May 2015 13:18:29 -0700 Subject: [PATCH 01/72] Prepare for release of 0.14.0 --- CHANGES.md | 46 +++++++++++++++++++ README.md | 2 +- project/Build.scala | 2 +- .../src/main/scala/com/twitter/package.scala | 2 +- .../ExecutionTutorial.scala | 2 +- version.sbt | 2 +- 6 files changed, 51 insertions(+), 5 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index a6bdf9a5d0..9d066bc5b2 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,51 @@ # Scalding # +### Version 0.14.0 ### +* add .unit to Execution object #1189 +* Override hashCode for Args #1190 +* Put a value in a exception message #1191 +* Add an exclusiveUpper method to DateRange #1194 +* Covert LzoTextDelimited to Cascading scheme. #1179 +* Remove Travis IRC notifications #1200 +* add LookupJoin and LookupJoinTest changes from summingbird #1199 +* Add a new ExecutionApp tutorial #1196 +* Move main simple example to be the typed API, and put the .'s at the sta... #1193 +* Add Execution.withArgs #1205 +* Config/Cascading updater #1197 +* Remove algebird serializers #1206 +* remove warnings in CumulativeSum #1215 +* Implicit execution context / easier switching between modes #1113 +* add row l1 normalize #1214 +* provide Args as an implicit val #1219 +* call sourceConfInit when reading from taps in local mode #1228 +* Add distinctCount and distinctValues helper methods to KeyedList. #1232 +* import hygiene: remove unused imports and remove JavaConversions use #1239 +* Swap hash and filename for filename-extension-sensitive code #1243 +* Remove more unused imports #1240 +* Provide useHdfsLocalMode for an easy switch to mapreduce local mode #1244 +* upgrade scalacheck and scalatest #1246 +* Optimize string and (hopefully) number comparisons a bit #1241 +* Note the active FlowProcess for Joiners #1235 +* Make sure Executions are executed at most once #1253 +* Fix Config.getUniqueIDs #1254 +* Add MustHasReducers trait. #1252 +* Make sure the EvalCache thread isDaemon #1255 +* Use non-regex split function #1251 +* make InputSizeReducerEstimator work for any CompositeTap #1256 +* TimePathedSource helper methods #1257 +* Fix for reducer estimation not working correctly if withReducers is set to 1 reducer #1263 +* Add make(dest) to TypedPipe #1217 +* Fix SimpleDateFormat caching by default #1265 +* upgrade sbt and sbt launcher script #1270 +* Add TypedPipeDiff for comparing typed pipes #1266 +* Change separator from \1 to \u0001 #1271 +* Disable reducer estimation for map-only steps #1276 +* Local sources support multiple paths #1275 +* fix the spelling of the cumulativeSumTest file #1281 +* Hydrate both sides of sampledCounts in skewJoinWithSmaller #1278 +* Bijection 0.8.0, algebird 0.10.0, chill 0.6.0, scala 2.10.5 #1287 +* Remove some deprecated items #1288 + ### Version 0.13.1 ### * Back out 4 changes to be binary compatible: https://github.com/twitter/scalding/pull/1187 * Use java.util.Random instead of scala.util.Random: https://github.com/twitter/scalding/pull/1186 diff --git a/README.md b/README.md index fc1b882d4d..595d146b82 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ Scalding is a Scala library that makes it easy to specify Hadoop MapReduce jobs. ![Scalding Logo](https://raw.github.com/twitter/scalding/develop/logo/scalding.png) -Current version: `0.13.1` +Current version: `0.14.0` ## Word Count diff --git a/project/Build.scala b/project/Build.scala index 737b4ba60d..4079e594b5 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -224,7 +224,7 @@ object ScaldingBuild extends Build { Some(subProj) .filterNot(unreleasedModules.contains(_)) .map { - s => "com.twitter" % ("scalding-" + s + "_2.10") % "0.13.0" + s => "com.twitter" % ("scalding-" + s + "_2.10") % "0.14.0" } def module(name: String) = { diff --git a/scalding-core/src/main/scala/com/twitter/package.scala b/scalding-core/src/main/scala/com/twitter/package.scala index f712d2d353..45904fe714 100644 --- a/scalding-core/src/main/scala/com/twitter/package.scala +++ b/scalding-core/src/main/scala/com/twitter/package.scala @@ -34,7 +34,7 @@ package object scalding { /** * Make sure this is in sync with version.sbt */ - val scaldingVersion: String = "0.13.1" + val scaldingVersion: String = "0.14.0" object RichPathFilter { implicit def toRichPathFilter(f: PathFilter) = new RichPathFilter(f) diff --git a/tutorial/execution-tutorial/ExecutionTutorial.scala b/tutorial/execution-tutorial/ExecutionTutorial.scala index f27ee2da8e..b4b57345ea 100644 --- a/tutorial/execution-tutorial/ExecutionTutorial.scala +++ b/tutorial/execution-tutorial/ExecutionTutorial.scala @@ -30,7 +30,7 @@ To test it, first build the assembly jar from root directory: ./sbt execution-tutorial/assembly Run: - scala -classpath tutorial/execution-tutorial/target/execution-tutorial-assembly-0.13.1.jar \ + scala -classpath tutorial/execution-tutorial/target/execution-tutorial-assembly-0.14.0.jar \ com.twitter.scalding.tutorial.MyExecJob --local \ --input tutorial/data/hello.txt \ --output tutorial/data/execution_output.txt diff --git a/version.sbt b/version.sbt index beb9953b43..501eb5530c 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "0.13.1" +version in ThisBuild := "0.14.0" From 2d5aa36139655a0ece5baf83c3665cf9bc1d9e2c Mon Sep 17 00:00:00 2001 From: Ian O'Connell Date: Tue, 2 Feb 2016 08:04:52 -0800 Subject: [PATCH 02/72] Merge pull request #1488 from rubanm/rubanm/drop_parquet_cascading Remove dependency on parquet-cascading --- .travis.blacklist | 4 +- project/Build.scala | 53 ++++-- .../src/test/resources/test.thrift | 7 + .../parquet/scrooge/ParquetScroogeScheme.java | 2 +- .../scrooge/Parquet346ScroogeScheme.scala | 4 +- .../scrooge/ParquetScroogeSchemeTest.java | 2 +- .../scalding/parquet/ParquetValueScheme.java | 166 +++++++++++++++++ .../parquet/thrift/ParquetTBaseScheme.java | 64 +++++++ .../parquet/tuple/ParquetTupleConverter.java | 92 ++++++++++ .../parquet/tuple/ParquetTupleScheme.java | 173 ++++++++++++++++++ .../parquet/tuple/SchemaIntersection.java | 45 +++++ .../parquet/tuple/TupleReadSupport.java | 60 ++++++ .../tuple/TupleRecordMaterializer.java | 27 +++ .../parquet/tuple/TupleWriteSupport.java | 94 ++++++++++ .../thrift/Parquet346TBaseScheme.scala | 4 +- .../parquet/thrift/ParquetThrift.scala | 9 +- .../scalding/parquet/tuple/ParquetTuple.scala | 3 +- .../thrift/TestParquetTBaseScheme.java | 168 +++++++++++++++++ .../parquet/tuple/TestParquetTupleScheme.java | 165 +++++++++++++++++ scalding-parquet/src/test/resources/names.txt | 3 + 20 files changed, 1123 insertions(+), 22 deletions(-) create mode 100644 scalding-parquet-fixtures/src/test/resources/test.thrift create mode 100644 scalding-parquet/src/main/java/com/twitter/scalding/parquet/ParquetValueScheme.java create mode 100644 scalding-parquet/src/main/java/com/twitter/scalding/parquet/thrift/ParquetTBaseScheme.java create mode 100644 scalding-parquet/src/main/java/com/twitter/scalding/parquet/tuple/ParquetTupleConverter.java create mode 100644 scalding-parquet/src/main/java/com/twitter/scalding/parquet/tuple/ParquetTupleScheme.java create mode 100644 scalding-parquet/src/main/java/com/twitter/scalding/parquet/tuple/SchemaIntersection.java create mode 100644 scalding-parquet/src/main/java/com/twitter/scalding/parquet/tuple/TupleReadSupport.java create mode 100644 scalding-parquet/src/main/java/com/twitter/scalding/parquet/tuple/TupleRecordMaterializer.java create mode 100644 scalding-parquet/src/main/java/com/twitter/scalding/parquet/tuple/TupleWriteSupport.java create mode 100644 scalding-parquet/src/test/java/com/twitter/scalding/parquet/thrift/TestParquetTBaseScheme.java create mode 100644 scalding-parquet/src/test/java/com/twitter/scalding/parquet/tuple/TestParquetTupleScheme.java create mode 100644 scalding-parquet/src/test/resources/names.txt diff --git a/.travis.blacklist b/.travis.blacklist index 62bf89b1be..a30e311bf7 100644 --- a/.travis.blacklist +++ b/.travis.blacklist @@ -5,5 +5,7 @@ scalding-benchmarks # These are just for fixtures, so blacklist for 2.10 and 2.11 scalding-thrift-macros-fixtures scalding-thrift-macros-fixtures +scalding-parquet-fixtures +scalding-parquet-fixtures +scalding-parquet-scrooge-fixtures scalding-parquet-scrooge-fixtures -scalding-parquet-scrooge-fixtures \ No newline at end of file diff --git a/project/Build.scala b/project/Build.scala index faaec2e027..d21c382c60 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -14,6 +14,8 @@ import scala.collection.JavaConverters._ object ScaldingBuild extends Build { + import ScroogeSBT.autoImport._ + def scalaBinaryVersion(scalaVersion: String) = scalaVersion match { case version if version startsWith "2.10" => "2.10" case version if version startsWith "2.11" => "2.11" @@ -22,6 +24,7 @@ object ScaldingBuild extends Build { def isScala210x(scalaVersion: String) = scalaBinaryVersion(scalaVersion) == "2.10" val algebirdVersion = "0.11.0" + val apacheCommonsVersion = "2.2" val avroVersion = "1.7.4" val bijectionVersion = "0.8.1" val cascadingAvroVersion = "2.1.2" @@ -347,10 +350,38 @@ object ScaldingBuild extends Build { ) ).dependsOn(scaldingCore) + lazy val scaldingParquetFixtures = module("parquet-fixtures") + .settings(ScroogeSBT.newSettings:_*) + .settings( + scroogeThriftSourceFolder in Test <<= baseDirectory { + base => base / "src/test/resources" + }, + sourceGenerators in Test <+= ( + streams, + scroogeThriftSources in Test, + scroogeIsDirty in Test, + sourceManaged + ).map { (out, sources, isDirty, outputDir) => + // for some reason, sbt sometimes calls us multiple times, often with no source files. + if (isDirty && sources.nonEmpty) { + out.log.info("Generating scrooge thrift for %s ...".format(sources.mkString(", "))) + ScroogeSBT.compile(out.log, outputDir, sources.toSet, Set(), Map(), "java", Set("--language", "java")) + } + (outputDir ** "*.java").get.toSeq + }, + libraryDependencies ++= Seq( + "com.twitter" %% "scrooge-serializer" % scroogeVersion % "provided", + "commons-lang" % "commons-lang" % apacheCommonsVersion, // needed for HashCodeBuilder used in thriftjava + "org.apache.thrift" % "libthrift" % thriftVersion + ) + ) + lazy val scaldingParquet = module("parquet").settings( libraryDependencies <++= (scalaVersion) { scalaVersion => Seq( + "org.apache.parquet" % "parquet-column" % parquetVersion, + "org.apache.parquet" % "parquet-hadoop" % parquetVersion, + "org.apache.parquet" % "parquet-thrift" % parquetVersion // see https://issues.apache.org/jira/browse/PARQUET-143 for exclusions - "org.apache.parquet" % "parquet-cascading" % parquetVersion exclude("org.apache.parquet", "parquet-pig") exclude("com.twitter.elephantbird", "elephant-bird-pig") exclude("com.twitter.elephantbird", "elephant-bird-core"), @@ -359,12 +390,11 @@ object ScaldingBuild extends Build { "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided", "org.scala-lang" % "scala-reflect" % scalaVersion, "com.twitter" %% "bijection-macros" % bijectionVersion, - "com.twitter" %% "chill-bijection" % chillVersion + "com.twitter" %% "chill-bijection" % chillVersion, + "com.twitter.elephantbird" % "elephant-bird-core" % elephantbirdVersion % "test" ) ++ (if(isScala210x(scalaVersion)) Seq("org.scalamacros" %% "quasiquotes" % quasiquotesVersion) else Seq()) }, addCompilerPlugin("org.scalamacros" % "paradise" % paradiseVersion cross CrossVersion.full)) - .dependsOn(scaldingCore, scaldingHadoopTest % "test") - - import ScroogeSBT.autoImport._ + .dependsOn(scaldingCore, scaldingHadoopTest % "test", scaldingParquetFixtures % "test->test") lazy val scaldingParquetScroogeFixtures = module("parquet-scrooge-fixtures") .settings(ScroogeSBT.newSettings:_*) @@ -387,25 +417,24 @@ object ScaldingBuild extends Build { }, libraryDependencies ++= Seq( "com.twitter" %% "scrooge-serializer" % scroogeVersion % "provided", + "commons-lang" % "commons-lang" % apacheCommonsVersion, // needed for HashCodeBuilder used in thriftjava "org.apache.thrift" % "libthrift" % thriftVersion + ) ) - ) lazy val scaldingParquetScrooge = module("parquet-scrooge") .settings( libraryDependencies ++= Seq( + "org.slf4j" % "slf4j-api" % slf4jVersion, // see https://issues.apache.org/jira/browse/PARQUET-143 for exclusions - "org.apache.parquet" % "parquet-cascading" % parquetVersion + "org.apache.parquet" % "parquet-thrift" % parquetVersion % "test" classifier "tests" exclude("org.apache.parquet", "parquet-pig") exclude("com.twitter.elephantbird", "elephant-bird-pig") exclude("com.twitter.elephantbird", "elephant-bird-core"), - "org.slf4j" % "slf4j-api" % slf4jVersion, - "org.apache.parquet" % "parquet-thrift" % parquetVersion % "test" classifier "tests", - "com.twitter" %% "scrooge-serializer" % scroogeVersion, + "com.twitter" %% "scrooge-serializer" % scroogeVersion, "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided", "com.novocode" % "junit-interface" % "0.11" % "test", "junit" % "junit" % junitVersion % "test" - ) ).dependsOn(scaldingCore, scaldingParquet % "compile->compile;test->test", scaldingParquetScroogeFixtures % "test->test") @@ -564,7 +593,7 @@ object ScaldingBuild extends Build { addCompilerPlugin("org.scalamacros" % "paradise" % "2.0.1" cross CrossVersion.full) ).dependsOn(scaldingCore) -lazy val scaldingThriftMacrosFixtures = module("thrift-macros-fixtures") + lazy val scaldingThriftMacrosFixtures = module("thrift-macros-fixtures") .settings(ScroogeSBT.newSettings:_*) .settings( scroogeThriftSourceFolder in Test <<= baseDirectory { diff --git a/scalding-parquet-fixtures/src/test/resources/test.thrift b/scalding-parquet-fixtures/src/test/resources/test.thrift new file mode 100644 index 0000000000..f30e354571 --- /dev/null +++ b/scalding-parquet-fixtures/src/test/resources/test.thrift @@ -0,0 +1,7 @@ +namespace java com.twitter.scalding.parquet.thrift_java.test +#@namespace scala com.twitter.scalding.parquet.thrift_scala.test + +struct Name { + 1: required string first_name, + 2: optional string last_name +} diff --git a/scalding-parquet-scrooge/src/main/java/com/twitter/scalding/parquet/scrooge/ParquetScroogeScheme.java b/scalding-parquet-scrooge/src/main/java/com/twitter/scalding/parquet/scrooge/ParquetScroogeScheme.java index a4ef0bb2f6..cff692b347 100644 --- a/scalding-parquet-scrooge/src/main/java/com/twitter/scalding/parquet/scrooge/ParquetScroogeScheme.java +++ b/scalding-parquet-scrooge/src/main/java/com/twitter/scalding/parquet/scrooge/ParquetScroogeScheme.java @@ -22,11 +22,11 @@ import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.RecordReader; +import com.twitter.scalding.parquet.ParquetValueScheme; import com.twitter.scrooge.ThriftStruct; import cascading.flow.FlowProcess; import cascading.tap.Tap; -import org.apache.parquet.cascading.ParquetValueScheme; import org.apache.parquet.filter2.predicate.FilterPredicate; import org.apache.parquet.hadoop.ParquetInputFormat; import org.apache.parquet.hadoop.ParquetOutputFormat; diff --git a/scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/Parquet346ScroogeScheme.scala b/scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/Parquet346ScroogeScheme.scala index fe1444f222..afc15ebc6b 100644 --- a/scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/Parquet346ScroogeScheme.scala +++ b/scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/Parquet346ScroogeScheme.scala @@ -2,10 +2,10 @@ package com.twitter.scalding.parquet.scrooge import cascading.flow.FlowProcess import cascading.tap.Tap +import com.twitter.scalding.parquet.ParquetValueScheme import com.twitter.scalding.parquet.thrift.Parquet346StructTypeRepairer import com.twitter.scrooge.{ ThriftStruct, ThriftStructCodec } import org.apache.hadoop.mapred.{ JobConf, OutputCollector, RecordReader } -import org.apache.parquet.cascading.ParquetValueScheme import org.apache.parquet.hadoop.thrift.ThriftReadSupport import org.apache.parquet.schema.MessageType import org.apache.parquet.thrift.struct.ThriftType.StructType @@ -83,4 +83,4 @@ class Parquet346ScroogeRecordConverter[T <: ThriftStruct](thriftClass: Class[T], // this is the fix -- we add in the missing structOrUnionType metadata // before passing it along - Parquet346StructTypeRepairer.repair(thriftType)) \ No newline at end of file + Parquet346StructTypeRepairer.repair(thriftType)) diff --git a/scalding-parquet-scrooge/src/test/java/com/twitter/scalding/parquet/scrooge/ParquetScroogeSchemeTest.java b/scalding-parquet-scrooge/src/test/java/com/twitter/scalding/parquet/scrooge/ParquetScroogeSchemeTest.java index 4055da43b6..bd626832ea 100644 --- a/scalding-parquet-scrooge/src/test/java/com/twitter/scalding/parquet/scrooge/ParquetScroogeSchemeTest.java +++ b/scalding-parquet-scrooge/src/test/java/com/twitter/scalding/parquet/scrooge/ParquetScroogeSchemeTest.java @@ -45,9 +45,9 @@ import org.apache.thrift.protocol.TProtocolFactory; import org.apache.thrift.transport.TIOStreamTransport; import org.junit.Test; -import org.apache.parquet.cascading.ParquetValueScheme.Config; import org.apache.parquet.hadoop.thrift.ThriftToParquetFileWriter; import org.apache.parquet.hadoop.util.ContextUtil; +import com.twitter.scalding.parquet.ParquetValueScheme.Config; import com.twitter.scalding.parquet.scrooge.thrift_scala.test.TestPersonWithAllInformation; import com.twitter.scalding.parquet.scrooge.thrift_java.test.Address; import com.twitter.scalding.parquet.scrooge.thrift_java.test.Phone; diff --git a/scalding-parquet/src/main/java/com/twitter/scalding/parquet/ParquetValueScheme.java b/scalding-parquet/src/main/java/com/twitter/scalding/parquet/ParquetValueScheme.java new file mode 100644 index 0000000000..2d71c44896 --- /dev/null +++ b/scalding-parquet/src/main/java/com/twitter/scalding/parquet/ParquetValueScheme.java @@ -0,0 +1,166 @@ +package com.twitter.scalding.parquet; + +import java.io.IOException; +import java.io.Serializable; + +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; + +import cascading.flow.FlowProcess; +import cascading.scheme.Scheme; +import cascading.scheme.SinkCall; +import cascading.scheme.SourceCall; +import cascading.tap.Tap; +import cascading.tuple.Tuple; +import cascading.tuple.TupleEntry; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.hadoop.ParquetInputFormat; +import org.apache.parquet.hadoop.mapred.Container; +import org.apache.parquet.hadoop.thrift.ParquetThriftInputFormat; +import org.apache.parquet.hadoop.thrift.ThriftReadSupport; + +import static org.apache.parquet.Preconditions.checkNotNull; + +/** + * A Cascading Scheme that returns a simple Tuple with a single value, the "value" object + * coming out of the underlying InputFormat. + * + * This is an abstract class; implementations are expected to set up their Input/Output Formats + * correctly in the respective Init methods. + */ +public abstract class ParquetValueScheme extends Scheme{ + + public static final class Config implements Serializable { + private final FilterPredicate filterPredicate; + private final String deprecatedProjectionString; + private final String strictProjectionString; + private final Class klass; + + private Config(Class klass, FilterPredicate filterPredicate, String deprecatedProjectionString, String strictProjectionString) { + this.filterPredicate = filterPredicate; + this.deprecatedProjectionString = deprecatedProjectionString; + this.strictProjectionString = strictProjectionString; + this.klass = klass; + } + + public Config() { + filterPredicate = null; + deprecatedProjectionString = null; + strictProjectionString = null; + klass = null; + } + + public FilterPredicate getFilterPredicate() { + return filterPredicate; + } + + @Deprecated + public String getProjectionString() { + return deprecatedProjectionString; + } + + public String getStrictProjectionString() { + return strictProjectionString; + } + + public Class getKlass() { + return klass; + } + + public Config withFilterPredicate(FilterPredicate f) { + return new Config(this.klass, checkNotNull(f, "filterPredicate"), this.deprecatedProjectionString, this.strictProjectionString); + } + + @Deprecated + public Config withProjectionString(String p) { + return new Config(this.klass, this.filterPredicate, checkNotNull(p, "projectionString"), this.strictProjectionString); + } + + public Config withStrictProjectionString(String p) { + return new Config(this.klass, this.filterPredicate, this.deprecatedProjectionString, checkNotNull(p, "projectionString")); + } + + public Config withRecordClass(Class klass) { + return new Config(checkNotNull(klass, "recordClass"), this.filterPredicate, this.deprecatedProjectionString, this.strictProjectionString); + } + } + + private static final long serialVersionUID = 157560846420730043L; + protected final Config config; + + public ParquetValueScheme() { + this(new Config()); + } + + public ParquetValueScheme(FilterPredicate filterPredicate) { + this(new Config().withFilterPredicate(filterPredicate)); + } + + public ParquetValueScheme(Config config) { + this.config = config; + } + + @Deprecated + private void setProjectionPushdown(JobConf jobConf) { + if (this.config.deprecatedProjectionString != null) { + ThriftReadSupport.setProjectionPushdown(jobConf, this.config.deprecatedProjectionString); + } + } + + private void setStrictProjectionPushdown(JobConf jobConf) { + if (this.config.strictProjectionString != null) { + ThriftReadSupport.setStrictFieldProjectionFilter(jobConf, this.config.strictProjectionString); + } + } + + private void setPredicatePushdown(JobConf jobConf) { + if (this.config.filterPredicate != null) { + ParquetInputFormat.setFilterPredicate(jobConf, this.config.filterPredicate); + } + } + @Override + public void sourceConfInit(FlowProcess jobConfFlowProcess, Tap jobConfRecordReaderOutputCollectorTap, final JobConf jobConf) { + setPredicatePushdown(jobConf); + setProjectionPushdown(jobConf); + setStrictProjectionPushdown(jobConf); + setRecordClass(jobConf); + } + + private void setRecordClass(JobConf jobConf) { + if (config.klass != null) { + ParquetThriftInputFormat.setThriftClass(jobConf, config.klass); + } + } + + @SuppressWarnings("unchecked") + @Override + public boolean source(FlowProcess fp, SourceCall sc) + throws IOException { + Container value = (Container) sc.getInput().createValue(); + boolean hasNext = sc.getInput().next(null, value); + if (!hasNext) { return false; } + + // Skip nulls + if (value == null) { return true; } + + sc.getIncomingEntry().setTuple(new Tuple(value.get())); + return true; + } + + @SuppressWarnings("unchecked") + @Override + public void sink(FlowProcess fp, SinkCall sc) + throws IOException { + TupleEntry tuple = sc.getOutgoingEntry(); + + if (tuple.size() != 1) { + throw new RuntimeException("ParquetValueScheme expects tuples with an arity of exactly 1, but found " + tuple.getFields()); + } + + T value = (T) tuple.getObject(0); + OutputCollector output = sc.getOutput(); + output.collect(null, value); + } + +} diff --git a/scalding-parquet/src/main/java/com/twitter/scalding/parquet/thrift/ParquetTBaseScheme.java b/scalding-parquet/src/main/java/com/twitter/scalding/parquet/thrift/ParquetTBaseScheme.java new file mode 100644 index 0000000000..d62596b98d --- /dev/null +++ b/scalding-parquet/src/main/java/com/twitter/scalding/parquet/thrift/ParquetTBaseScheme.java @@ -0,0 +1,64 @@ +package com.twitter.scalding.parquet.thrift; + +import com.twitter.scalding.parquet.ParquetValueScheme; + +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.thrift.TBase; + +import cascading.flow.FlowProcess; +import cascading.tap.Tap; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.hadoop.ParquetInputFormat; +import org.apache.parquet.hadoop.mapred.DeprecatedParquetInputFormat; +import org.apache.parquet.hadoop.mapred.DeprecatedParquetOutputFormat; +import org.apache.parquet.hadoop.thrift.ThriftReadSupport; +import org.apache.parquet.hadoop.thrift.TBaseWriteSupport; +import org.apache.parquet.thrift.TBaseRecordConverter; + +public class ParquetTBaseScheme> extends ParquetValueScheme { + + // In the case of reads, we can read the thrift class from the file metadata + public ParquetTBaseScheme() { + this(new Config()); + } + + public ParquetTBaseScheme(Class thriftClass) { + this(new Config().withRecordClass(thriftClass)); + } + + public ParquetTBaseScheme(FilterPredicate filterPredicate) { + this(new Config().withFilterPredicate(filterPredicate)); + } + + public ParquetTBaseScheme(FilterPredicate filterPredicate, Class thriftClass) { + this(new Config().withRecordClass(thriftClass).withFilterPredicate(filterPredicate)); + } + + public ParquetTBaseScheme(ParquetValueScheme.Config config) { + super(config); + } + + @Override + public void sourceConfInit(FlowProcess fp, + Tap tap, JobConf jobConf) { + super.sourceConfInit(fp, tap, jobConf); + jobConf.setInputFormat(DeprecatedParquetInputFormat.class); + ParquetInputFormat.setReadSupportClass(jobConf, ThriftReadSupport.class); + ThriftReadSupport.setRecordConverterClass(jobConf, TBaseRecordConverter.class); + } + + @Override + public void sinkConfInit(FlowProcess fp, + Tap tap, JobConf jobConf) { + + if (this.config.getKlass() == null) { + throw new IllegalArgumentException("To use ParquetTBaseScheme as a sink, you must specify a thrift class in the constructor"); + } + + DeprecatedParquetOutputFormat.setAsOutputFormat(jobConf); + DeprecatedParquetOutputFormat.setWriteSupportClass(jobConf, TBaseWriteSupport.class); + TBaseWriteSupport.setThriftClass(jobConf, this.config.getKlass()); + } +} diff --git a/scalding-parquet/src/main/java/com/twitter/scalding/parquet/tuple/ParquetTupleConverter.java b/scalding-parquet/src/main/java/com/twitter/scalding/parquet/tuple/ParquetTupleConverter.java new file mode 100644 index 0000000000..4f313d7392 --- /dev/null +++ b/scalding-parquet/src/main/java/com/twitter/scalding/parquet/tuple/ParquetTupleConverter.java @@ -0,0 +1,92 @@ +package com.twitter.scalding.parquet.tuple; + +import cascading.tuple.Tuple; + +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.io.api.Converter; +import org.apache.parquet.io.api.GroupConverter; +import org.apache.parquet.io.api.PrimitiveConverter; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.Type; + +public class ParquetTupleConverter extends GroupConverter { + + protected Tuple currentTuple; + private final Converter[] converters; + + public ParquetTupleConverter(GroupType parquetSchema) { + int schemaSize = parquetSchema.getFieldCount(); + + this.converters = new Converter[schemaSize]; + for (int i = 0; i < schemaSize; i++) { + Type type = parquetSchema.getType(i); + converters[i] = newConverter(type, i); + } + } + + private Converter newConverter(Type type, int i) { + if(!type.isPrimitive()) { + throw new IllegalArgumentException("cascading can only build tuples from primitive types"); + } else { + return new TuplePrimitiveConverter(this, i); + } + } + + @Override + public Converter getConverter(int fieldIndex) { + return converters[fieldIndex]; + } + + @Override + final public void start() { + currentTuple = Tuple.size(converters.length); + } + + @Override + public void end() { + } + + final public Tuple getCurrentTuple() { + return currentTuple; + } + + static final class TuplePrimitiveConverter extends PrimitiveConverter { + private final ParquetTupleConverter parent; + private final int index; + + public TuplePrimitiveConverter(ParquetTupleConverter parent, int index) { + this.parent = parent; + this.index = index; + } + + @Override + public void addBinary(Binary value) { + parent.getCurrentTuple().setString(index, value.toStringUsingUTF8()); + } + + @Override + public void addBoolean(boolean value) { + parent.getCurrentTuple().setBoolean(index, value); + } + + @Override + public void addDouble(double value) { + parent.getCurrentTuple().setDouble(index, value); + } + + @Override + public void addFloat(float value) { + parent.getCurrentTuple().setFloat(index, value); + } + + @Override + public void addInt(int value) { + parent.getCurrentTuple().setInteger(index, value); + } + + @Override + public void addLong(long value) { + parent.getCurrentTuple().setLong(index, value); + } + } +} diff --git a/scalding-parquet/src/main/java/com/twitter/scalding/parquet/tuple/ParquetTupleScheme.java b/scalding-parquet/src/main/java/com/twitter/scalding/parquet/tuple/ParquetTupleScheme.java new file mode 100644 index 0000000000..203f3cd67c --- /dev/null +++ b/scalding-parquet/src/main/java/com/twitter/scalding/parquet/tuple/ParquetTupleScheme.java @@ -0,0 +1,173 @@ +package com.twitter.scalding.parquet.tuple; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; + +import cascading.flow.FlowProcess; +import cascading.scheme.Scheme; +import cascading.scheme.SinkCall; +import cascading.scheme.SourceCall; +import cascading.tap.CompositeTap; +import cascading.tap.Tap; +import cascading.tap.TapException; +import cascading.tap.hadoop.Hfs; +import cascading.tuple.Fields; +import cascading.tuple.Tuple; +import cascading.tuple.TupleEntry; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.hadoop.Footer; +import org.apache.parquet.hadoop.ParquetInputFormat; +import org.apache.parquet.hadoop.ParquetOutputFormat; +import org.apache.parquet.hadoop.mapred.Container; +import org.apache.parquet.hadoop.mapred.DeprecatedParquetInputFormat; +import org.apache.parquet.hadoop.mapred.DeprecatedParquetOutputFormat; +import org.apache.parquet.schema.MessageType; + +import static org.apache.parquet.Preconditions.checkNotNull; + +/** + * A Cascading Scheme that converts Parquet groups into Cascading tuples. + * If you provide it with sourceFields, it will selectively materialize only the columns for those fields. + * The names must match the names in the Parquet schema. + * If you do not provide sourceFields, or use Fields.ALL or Fields.UNKNOWN, it will create one from the + * Parquet schema. + * Currently, only primitive types are supported. TODO: allow nested fields in the Parquet schema to be + * flattened to a top-level field in the Cascading tuple. + * + * @author Avi Bryant + */ + +public class ParquetTupleScheme extends Scheme{ + + private static final long serialVersionUID = 0L; + private String parquetSchema; + private final FilterPredicate filterPredicate; + + public ParquetTupleScheme() { + super(); + this.filterPredicate = null; + } + + public ParquetTupleScheme(Fields sourceFields) { + super(sourceFields); + this.filterPredicate = null; + } + + public ParquetTupleScheme(FilterPredicate filterPredicate) { + this.filterPredicate = checkNotNull(filterPredicate, "filterPredicate"); + } + + public ParquetTupleScheme(FilterPredicate filterPredicate, Fields sourceFields) { + super(sourceFields); + this.filterPredicate = checkNotNull(filterPredicate, "filterPredicate"); + } + + /** + * ParquetTupleScheme constructor used a sink need to be implemented + * + * @param sourceFields used for the reading step + * @param sinkFields used for the writing step + * @param schema is mandatory if you add sinkFields and needs to be the + * toString() from a MessageType. This value is going to be parsed when the + * parquet file will be created. + */ + public ParquetTupleScheme(Fields sourceFields, Fields sinkFields, final String schema) { + super(sourceFields, sinkFields); + parquetSchema = schema; + this.filterPredicate = null; + } + + @SuppressWarnings("rawtypes") + @Override + public void sourceConfInit(FlowProcess fp, + Tap tap, JobConf jobConf) { + + if (filterPredicate != null) { + ParquetInputFormat.setFilterPredicate(jobConf, filterPredicate); + } + + jobConf.setInputFormat(DeprecatedParquetInputFormat.class); + ParquetInputFormat.setReadSupportClass(jobConf, TupleReadSupport.class); + TupleReadSupport.setRequestedFields(jobConf, getSourceFields()); + } + + @Override + public Fields retrieveSourceFields(FlowProcess flowProcess, Tap tap) { + MessageType schema = readSchema(flowProcess, tap); + SchemaIntersection intersection = new SchemaIntersection(schema, getSourceFields()); + + setSourceFields(intersection.getSourceFields()); + + return getSourceFields(); + } + + private MessageType readSchema(FlowProcess flowProcess, Tap tap) { + try { + Hfs hfs; + + if( tap instanceof CompositeTap ) + hfs = (Hfs) ( (CompositeTap) tap ).getChildTaps().next(); + else + hfs = (Hfs) tap; + + List