diff --git a/.travis.yml b/.travis.yml index 810d98fc36..2a8c092edd 100644 --- a/.travis.yml +++ b/.travis.yml @@ -36,11 +36,11 @@ matrix: script: "scripts/run_test.sh" - scala: 2.11.8 - env: BUILD="base" TEST_TARGET="scalding-avro scalding-hraven scalding-commons scalding-parquet scalding-parquet-scrooge" + env: BUILD="base" TEST_TARGET="scalding-avro scalding-hraven scalding-commons scalding-parquet scalding-parquet-cascading scalding-parquet-scrooge scalding-parquet-scrooge-cascading" script: "scripts/run_test.sh" - scala: 2.12.1 - env: BUILD="base" TEST_TARGET="scalding-avro scalding-hraven scalding-commons scalding-parquet scalding-parquet-scrooge" + env: BUILD="base" TEST_TARGET="scalding-avro scalding-hraven scalding-commons scalding-parquet scalding-parquet-cascading scalding-parquet-scrooge scalding-parquet-scrooge-cascading" script: "scripts/run_test.sh" - scala: 2.11.8 diff --git a/build.sbt b/build.sbt index d319d0dd49..3b27f140e5 100644 --- a/build.sbt +++ b/build.sbt @@ -23,7 +23,7 @@ val cascadingAvroVersion = "2.1.2" val chillVersion = "0.8.4" val elephantbirdVersion = "4.15" val hadoopLzoVersion = "0.4.19" -val hadoopVersion = "2.5.0" +val hadoopVersion = "2.6.0" val hbaseVersion = "0.94.10" val hravenVersion = "1.0.1" val jacksonVersion = "2.8.7" @@ -38,6 +38,7 @@ val scroogeVersion = "4.12.0" val slf4jVersion = "1.6.6" val thriftVersion = "0.5.0" val junitVersion = "4.10" +val junitInterfaceVersion = "0.11" val macroCompatVersion = "1.1.1" val jlineVersion = "2.14.3" @@ -63,7 +64,7 @@ val sharedSettings = assemblySettings ++ scalariformSettings ++ Seq( "org.scalacheck" %% "scalacheck" % scalaCheckVersion % "test", "org.scalatest" %% "scalatest" % scalaTestVersion % "test", "org.slf4j" % "slf4j-log4j12" % slf4jVersion % "test", - "com.novocode" % "junit-interface" % "0.10" % "test" + "com.novocode" % "junit-interface" % junitInterfaceVersion % "test" ), resolvers ++= Seq( @@ -71,7 +72,7 @@ val sharedSettings = assemblySettings ++ scalariformSettings ++ Seq( Opts.resolver.sonatypeSnapshots, Opts.resolver.sonatypeReleases, "Concurrent Maven Repo" at "http://conjars.org/repo", - "Twitter Maven" at "http://maven.twttr.com", + "Twitter Maven" at "https://maven.twttr.com", "Cloudera" at "https://repository.cloudera.com/artifactory/cloudera-repos/" ), @@ -217,7 +218,9 @@ lazy val scalding = Project( scaldingCommons, scaldingAvro, scaldingParquet, + scaldingParquetCascading, scaldingParquetScrooge, + scaldingParquetScroogeCascading, scaldingHRaven, scaldingRepl, scaldingJson, @@ -243,7 +246,9 @@ lazy val scaldingAssembly = Project( scaldingCommons, scaldingAvro, scaldingParquet, + scaldingParquetCascading, scaldingParquetScrooge, + scaldingParquetScroogeCascading, scaldingHRaven, scaldingRepl, scaldingJson, @@ -296,10 +301,10 @@ lazy val scaldingDate = module("date") lazy val scaldingGraph = module("graph") lazy val cascadingVersion = - System.getenv.asScala.getOrElse("SCALDING_CASCADING_VERSION", "2.6.1") + System.getenv.asScala.getOrElse("SCALDING_CASCADING_VERSION", "3.2.1") lazy val cascadingJDBCVersion = - System.getenv.asScala.getOrElse("SCALDING_CASCADING_JDBC_VERSION", "2.6.0") + System.getenv.asScala.getOrElse("SCALDING_CASCADING_JDBC_VERSION", "3.0.0-wip-127") lazy val scaldingBenchmarks = module("benchmarks") .settings( @@ -325,7 +330,8 @@ lazy val scaldingCore = module("core").settings( "com.twitter" %% "bijection-macros" % bijectionVersion, "com.twitter" %% "chill" % chillVersion, "com.twitter" %% "chill-algebird" % chillVersion, - "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided", + "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided" + exclude("com.google.guava", "guava"), "org.scala-lang" % "scala-library" % scalaVersion.value, "org.scala-lang" % "scala-reflect" % scalaVersion.value, "org.slf4j" % "slf4j-api" % slf4jVersion, @@ -340,11 +346,12 @@ lazy val scaldingCommons = module("commons").settings( "com.twitter" %% "bijection-core" % bijectionVersion, "com.twitter" %% "algebird-core" % algebirdVersion, "com.twitter" %% "chill" % chillVersion, - "com.twitter.elephantbird" % "elephant-bird-cascading2" % elephantbirdVersion, + "com.twitter.elephantbird" % "elephant-bird-cascading3" % elephantbirdVersion, "com.twitter.elephantbird" % "elephant-bird-core" % elephantbirdVersion, "com.hadoop.gplcompression" % "hadoop-lzo" % hadoopLzoVersion, // TODO: split this out into scalding-thrift - "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided", + "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided" + exclude("com.google.guava", "guava"), "org.apache.thrift" % "libthrift" % thriftVersion, // TODO: split this out into a scalding-scrooge "com.twitter" %% "scrooge-serializer" % scroogeVersion % "provided" @@ -361,6 +368,7 @@ lazy val scaldingAvro = module("avro").settings( "org.apache.avro" % "avro" % avroVersion, "org.slf4j" % "slf4j-api" % slf4jVersion, "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided" + exclude("com.google.guava", "guava") ) ).dependsOn(scaldingCore) @@ -376,7 +384,8 @@ lazy val scaldingParquetFixtures = module("parquet-fixtures") ) ) -lazy val scaldingParquet = module("parquet").settings( +// separate target that only depends on parquet, thrift, eb and cascading. Not scalding. +lazy val scaldingParquetCascading = module("parquet-cascading").settings( libraryDependencies ++= Seq( "org.apache.parquet" % "parquet-column" % parquetVersion, "org.apache.parquet" % "parquet-hadoop" % parquetVersion, @@ -385,10 +394,23 @@ lazy val scaldingParquet = module("parquet").settings( exclude("org.apache.parquet", "parquet-pig") exclude("com.twitter.elephantbird", "elephant-bird-pig") exclude("com.twitter.elephantbird", "elephant-bird-core"), + "org.apache.thrift" % "libthrift" % thriftVersion, + "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided" + exclude("com.google.guava", "guava"), + "cascading" % "cascading-core" % cascadingVersion % "provided", + "cascading" % "cascading-hadoop" % cascadingVersion % "provided", + "com.twitter.elephantbird" % "elephant-bird-core" % elephantbirdVersion % "test" + ) +).dependsOn(scaldingParquetFixtures % "test->test") + +lazy val scaldingParquet = module("parquet").settings( + libraryDependencies ++= Seq( + "org.apache.parquet" % "parquet-column" % parquetVersion, + "org.apache.parquet" % "parquet-hadoop" % parquetVersion, "org.scala-lang" % "scala-compiler" % scalaVersion.value, - "org.apache.thrift" % "libthrift" % "0.7.0", "org.slf4j" % "slf4j-api" % slf4jVersion, - "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided", + "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided" + exclude("com.google.guava", "guava"), "org.scala-lang" % "scala-reflect" % scalaVersion.value, "com.twitter" %% "bijection-macros" % bijectionVersion, "com.twitter" %% "chill-bijection" % chillVersion, @@ -396,9 +418,7 @@ lazy val scaldingParquet = module("parquet").settings( "org.typelevel" %% "macro-compat" % macroCompatVersion ), addCompilerPlugin("org.scalamacros" % "paradise" % paradiseVersion cross CrossVersion.full)) - .dependsOn(scaldingCore, scaldingHadoopTest % "test", scaldingParquetFixtures % "test->test") - - + .dependsOn(scaldingCore, scaldingParquetCascading, scaldingHadoopTest % "test", scaldingParquetFixtures % "test->test") lazy val scaldingParquetScroogeFixtures = module("parquet-scrooge-fixtures") .settings( @@ -412,6 +432,25 @@ lazy val scaldingParquetScroogeFixtures = module("parquet-scrooge-fixtures") ) ) +// separate target that only depends on parquet, scrooge, eb and cascading. Not scalding. +lazy val scaldingParquetScroogeCascading = module("parquet-scrooge-cascading") + .settings( + libraryDependencies ++= Seq( + // see https://issues.apache.org/jira/browse/PARQUET-143 for exclusions + "cascading" % "cascading-core" % cascadingVersion % "provided", + "cascading" % "cascading-hadoop" % cascadingVersion % "test", + "org.apache.parquet" % "parquet-thrift" % parquetVersion % "test" classifier "tests" + exclude("org.apache.parquet", "parquet-pig") + exclude("com.twitter.elephantbird", "elephant-bird-pig") + exclude("com.twitter.elephantbird", "elephant-bird-core"), + "com.twitter" %% "scrooge-serializer" % scroogeVersion + exclude("com.google.guava", "guava"), + "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided" + exclude("com.google.guava", "guava"), + "junit" % "junit" % junitVersion % "test" + ) +).dependsOn(scaldingParquetCascading % "compile->compile;test->test", scaldingParquetScroogeFixtures % "test->test") + lazy val scaldingParquetScrooge = module("parquet-scrooge") .settings( libraryDependencies ++= Seq( @@ -423,12 +462,13 @@ lazy val scaldingParquetScrooge = module("parquet-scrooge") exclude("com.twitter.elephantbird", "elephant-bird-core"), "com.twitter" %% "scrooge-serializer" % scroogeVersion exclude("com.google.guava", "guava"), - "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided", - "com.novocode" % "junit-interface" % "0.11" % "test", + "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided" + exclude("com.google.guava", "guava"), + "com.novocode" % "junit-interface" % junitInterfaceVersion % "test", "junit" % "junit" % junitVersion % "test" ) -).dependsOn(scaldingCore, scaldingParquet % "compile->compile;test->test", scaldingParquetScroogeFixtures % "test->test") +).dependsOn(scaldingCore, scaldingParquetScroogeCascading, scaldingParquet % "compile->compile;test->test", scaldingParquetScroogeFixtures % "test->test") lazy val scaldingHRaven = module("hraven").settings( libraryDependencies ++= Seq( @@ -450,6 +490,7 @@ lazy val scaldingHRaven = module("hraven").settings( "org.apache.hbase" % "hbase" % hbaseVersion, "org.slf4j" % "slf4j-api" % slf4jVersion, "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided" + exclude("com.google.guava", "guava") ) ).dependsOn(scaldingCore) @@ -471,8 +512,10 @@ lazy val scaldingRepl = module("repl") "jline" % "jline" % jlineVersion, "org.scala-lang" % "scala-compiler" % scalaVersion.value, "org.scala-lang" % "scala-reflect" % scalaVersion.value, - "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided", - "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "unprovided", + "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided" + exclude("com.google.guava", "guava"), + "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "unprovided" + exclude("com.google.guava", "guava"), "org.slf4j" % "slf4j-api" % slf4jVersion, "org.slf4j" % "slf4j-log4j12" % slf4jVersion % "provided", "org.slf4j" % "slf4j-log4j12" % slf4jVersion % "unprovided" @@ -499,16 +542,18 @@ addCompilerPlugin("org.scalamacros" % "paradise" % paradiseVersion cross CrossVe lazy val scaldingJson = module("json").settings( libraryDependencies ++= Seq( - "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided", + "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided" + exclude("com.google.guava", "guava"), "com.fasterxml.jackson.module" %% "jackson-module-scala" % jacksonVersion, "org.json4s" %% "json4s-native" % json4SVersion, - "com.twitter.elephantbird" % "elephant-bird-cascading2" % elephantbirdVersion % "provided" + "com.twitter.elephantbird" % "elephant-bird-cascading3" % elephantbirdVersion % "provided" ) ).dependsOn(scaldingCore) lazy val scaldingJdbc = module("jdbc").settings( libraryDependencies ++= Seq( - "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided", + "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided" + exclude("com.google.guava", "guava"), "cascading" % "cascading-jdbc-core" % cascadingJDBCVersion, "cascading" % "cascading-jdbc-mysql" % cascadingJDBCVersion ) @@ -516,7 +561,8 @@ lazy val scaldingJdbc = module("jdbc").settings( lazy val scaldingHadoopTest = module("hadoop-test").settings( libraryDependencies ++= Seq( - "org.apache.hadoop" % "hadoop-client" % hadoopVersion, + "org.apache.hadoop" % "hadoop-client" % hadoopVersion + exclude("com.google.guava", "guava"), "org.apache.hadoop" % "hadoop-minicluster" % hadoopVersion, "org.apache.hadoop" % "hadoop-yarn-server-tests" % hadoopVersion classifier "tests", "org.apache.hadoop" % "hadoop-yarn-server" % hadoopVersion, @@ -533,7 +579,8 @@ lazy val scaldingHadoopTest = module("hadoop-test").settings( lazy val scaldingEstimatorsTest = module("estimators-test").settings( libraryDependencies ++= Seq( - "org.apache.hadoop" % "hadoop-client" % hadoopVersion, + "org.apache.hadoop" % "hadoop-client" % hadoopVersion + exclude("com.google.guava", "guava"), "org.apache.hadoop" % "hadoop-minicluster" % hadoopVersion, "org.apache.hadoop" % "hadoop-yarn-server-tests" % hadoopVersion classifier "tests", "org.apache.hadoop" % "hadoop-yarn-server" % hadoopVersion, @@ -561,9 +608,10 @@ lazy val maple = Project( // Disable cross publishing for this artifact publishArtifact := !scalaVersion.value.startsWith("2.10"), libraryDependencies ++= Seq( - "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided", + "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided" + exclude("com.google.guava", "guava"), "org.apache.hbase" % "hbase" % hbaseVersion % "provided", - "cascading" % "cascading-hadoop" % cascadingVersion + "cascading" % "cascading-hadoop" % cascadingVersion % "provided" ) ) @@ -576,7 +624,8 @@ lazy val executionTutorial = Project( libraryDependencies ++= Seq( "org.scala-lang" % "scala-library" % scalaVersion.value, "org.scala-lang" % "scala-reflect" % scalaVersion.value, - "org.apache.hadoop" % "hadoop-client" % hadoopVersion, + "org.apache.hadoop" % "hadoop-client" % hadoopVersion + exclude("com.google.guava", "guava"), "org.slf4j" % "slf4j-api" % slf4jVersion, "org.slf4j" % "slf4j-log4j12" % slf4jVersion, "cascading" % "cascading-hadoop" % cascadingVersion @@ -611,10 +660,9 @@ lazy val scaldingThriftMacros = module("thrift-macros") "com.twitter" %% "scrooge-serializer" % scroogeVersion % "provided" exclude("com.google.guava", "guava"), "org.apache.thrift" % "libthrift" % thriftVersion, - "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "test", + "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "test" + exclude("com.google.guava", "guava"), "org.apache.hadoop" % "hadoop-minicluster" % hadoopVersion % "test", - "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "test", - "org.apache.hadoop" % "hadoop-minicluster" % hadoopVersion % "test", "org.apache.hadoop" % "hadoop-yarn-server-tests" % hadoopVersion classifier "tests", "org.apache.hadoop" % "hadoop-yarn-server" % hadoopVersion % "test", "org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion classifier "tests", diff --git a/maple/src/main/java/com/etsy/cascading/tap/local/LocalTap.java b/maple/src/main/java/com/etsy/cascading/tap/local/LocalTap.java index 1aefbbb49f..4b0416b11f 100644 --- a/maple/src/main/java/com/etsy/cascading/tap/local/LocalTap.java +++ b/maple/src/main/java/com/etsy/cascading/tap/local/LocalTap.java @@ -5,6 +5,7 @@ import java.util.Properties; import java.util.logging.Logger; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.RecordReader; @@ -45,18 +46,18 @@ public class LocalTap extends Tap scheme, + public LocalTap(String path, Scheme scheme, SinkMode sinkMode) { super(new LocalScheme(scheme), sinkMode); setup(path, scheme); } - public LocalTap(String path, Scheme scheme) { + public LocalTap(String path, Scheme scheme) { super(new LocalScheme(scheme)); setup(path, scheme); } - private void setup(String path, Scheme scheme) { + private void setup(String path, Scheme scheme) { this.path = path; /* @@ -90,13 +91,13 @@ public String getIdentifier() { } @Override - public TupleEntryIterator openForRead(FlowProcess flowProcess, RecordReader input) throws IOException { + public TupleEntryIterator openForRead(FlowProcess flowProcess, RecordReader input) throws IOException { JobConf jobConf = mergeDefaults("LocalTap#openForRead", flowProcess.getConfigCopy(), defaults); return lfs.openForRead(new HadoopFlowProcess(jobConf)); } @Override - public TupleEntryCollector openForWrite(FlowProcess flowProcess, OutputCollector output) + public TupleEntryCollector openForWrite(FlowProcess flowProcess, OutputCollector output) throws IOException { JobConf jobConf = mergeDefaults("LocalTap#openForWrite", flowProcess.getConfigCopy(), defaults); return lfs.openForWrite(new HadoopFlowProcess(jobConf)); @@ -141,11 +142,11 @@ private static class LocalScheme extends Scheme { private static final long serialVersionUID = 5710119342340369543L; - private Scheme scheme; + private Scheme scheme; private JobConf defaults; private Lfs lfs; - public LocalScheme(Scheme scheme) { + public LocalScheme(Scheme scheme) { super(scheme.getSourceFields(), scheme.getSinkFields()); this.scheme = scheme; } @@ -159,19 +160,19 @@ private void setLfs(Lfs lfs) { } @Override - public Fields retrieveSourceFields(FlowProcess flowProcess, + public Fields retrieveSourceFields(FlowProcess flowProcess, Tap tap) { return scheme.retrieveSourceFields(new HadoopFlowProcess(defaults), lfs); } @Override - public void presentSourceFields(FlowProcess flowProcess, + public void presentSourceFields(FlowProcess flowProcess, Tap tap, Fields fields) { scheme.presentSourceFields(new HadoopFlowProcess(defaults), lfs, fields); } @Override - public void sourceConfInit(FlowProcess flowProcess, + public void sourceConfInit(FlowProcess flowProcess, Tap tap, Properties conf) { JobConf jobConf = mergeDefaults("LocalScheme#sourceConfInit", conf, defaults); scheme.sourceConfInit(new HadoopFlowProcess(jobConf), lfs, jobConf); @@ -179,19 +180,19 @@ public void sourceConfInit(FlowProcess flowProcess, } @Override - public Fields retrieveSinkFields(FlowProcess flowProcess, + public Fields retrieveSinkFields(FlowProcess flowProcess, Tap tap) { return scheme.retrieveSinkFields(new HadoopFlowProcess(defaults), lfs); } @Override - public void presentSinkFields(FlowProcess flowProcess, + public void presentSinkFields(FlowProcess flowProcess, Tap tap, Fields fields) { scheme.presentSinkFields(new HadoopFlowProcess(defaults), lfs, fields); } @Override - public void sinkConfInit(FlowProcess flowProcess, + public void sinkConfInit(FlowProcess flowProcess, Tap tap, Properties conf) { JobConf jobConf = mergeDefaults("LocalScheme#sinkConfInit", conf, defaults); scheme.sinkConfInit(new HadoopFlowProcess(jobConf), lfs, jobConf); @@ -199,13 +200,13 @@ public void sinkConfInit(FlowProcess flowProcess, } @Override - public boolean source(FlowProcess flowProcess, SourceCall sourceCall) + public boolean source(FlowProcess flowProcess, SourceCall sourceCall) throws IOException { throw new RuntimeException("LocalTap#source is never called"); } @Override - public void sink(FlowProcess flowProcess, SinkCall sinkCall) + public void sink(FlowProcess flowProcess, SinkCall sinkCall) throws IOException { throw new RuntimeException("LocalTap#sink is never called"); } diff --git a/maple/src/main/java/com/twitter/maple/hbase/HBaseScheme.java b/maple/src/main/java/com/twitter/maple/hbase/HBaseScheme.java index 0f830ede86..6dfa4ff7ff 100644 --- a/maple/src/main/java/com/twitter/maple/hbase/HBaseScheme.java +++ b/maple/src/main/java/com/twitter/maple/hbase/HBaseScheme.java @@ -154,7 +154,7 @@ public String[] getFamilyNames() { } @Override - public void sourcePrepare(FlowProcess flowProcess, + public void sourcePrepare(FlowProcess flowProcess, SourceCall sourceCall) { Object[] pair = new Object[]{sourceCall.getInput().createKey(), sourceCall.getInput().createValue()}; @@ -163,13 +163,13 @@ public void sourcePrepare(FlowProcess flowProcess, } @Override - public void sourceCleanup(FlowProcess flowProcess, + public void sourceCleanup(FlowProcess flowProcess, SourceCall sourceCall) { sourceCall.setContext(null); } @Override - public boolean source(FlowProcess flowProcess, + public boolean source(FlowProcess flowProcess, SourceCall sourceCall) throws IOException { Tuple result = new Tuple(); @@ -206,7 +206,7 @@ public boolean source(FlowProcess flowProcess, } @Override - public void sink(FlowProcess flowProcess, SinkCall sinkCall) + public void sink(FlowProcess flowProcess, SinkCall sinkCall) throws IOException { TupleEntry tupleEntry = sinkCall.getOutgoingEntry(); OutputCollector outputCollector = sinkCall.getOutput(); @@ -231,7 +231,7 @@ public void sink(FlowProcess flowProcess, SinkCall process, + public void sinkConfInit(FlowProcess process, Tap tap, JobConf conf) { conf.setOutputFormat(TableOutputFormat.class); @@ -240,7 +240,7 @@ public void sinkConfInit(FlowProcess process, } @Override - public void sourceConfInit(FlowProcess process, + public void sourceConfInit(FlowProcess process, Tap tap, JobConf conf) { conf.setInputFormat(TableInputFormat.class); diff --git a/maple/src/main/java/com/twitter/maple/hbase/HBaseTap.java b/maple/src/main/java/com/twitter/maple/hbase/HBaseTap.java index b7cf671781..b239ebc553 100644 --- a/maple/src/main/java/com/twitter/maple/hbase/HBaseTap.java +++ b/maple/src/main/java/com/twitter/maple/hbase/HBaseTap.java @@ -144,8 +144,8 @@ protected HBaseAdmin getHBaseAdmin(JobConf conf) throws MasterNotRunningExceptio } @Override - public void sinkConfInit(FlowProcess process, JobConf conf) { - if (quorumNames != null) { + public void sinkConfInit(FlowProcess process, JobConf conf) { + if(quorumNames != null) { conf.set("hbase.zookeeper.quorum", quorumNames); } else { Configuration hbaseConfig = HBaseConfiguration.create(conf); @@ -183,12 +183,12 @@ public String getIdentifier() { } @Override - public TupleEntryIterator openForRead(FlowProcess jobConfFlowProcess, RecordReader recordReader) throws IOException { + public TupleEntryIterator openForRead(FlowProcess jobConfFlowProcess, RecordReader recordReader) throws IOException { return new HadoopTupleEntrySchemeIterator(jobConfFlowProcess, this, recordReader); } @Override - public TupleEntryCollector openForWrite(FlowProcess jobConfFlowProcess, OutputCollector outputCollector) throws IOException { + public TupleEntryCollector openForWrite(FlowProcess jobConfFlowProcess, OutputCollector outputCollector) throws IOException { HBaseTapCollector hBaseCollector = new HBaseTapCollector( jobConfFlowProcess, this ); hBaseCollector.prepare(); return hBaseCollector; @@ -235,7 +235,7 @@ public long getModifiedTime(JobConf jobConf) throws IOException { } @Override - public void sourceConfInit(FlowProcess process, JobConf conf) { + public void sourceConfInit(FlowProcess process, JobConf conf) { // a hack for MultiInputFormat to see that there is a child format FileInputFormat.setInputPaths( conf, getPath() ); diff --git a/maple/src/main/java/com/twitter/maple/hbase/HBaseTapCollector.java b/maple/src/main/java/com/twitter/maple/hbase/HBaseTapCollector.java index f5ad1ed2dd..1f726c465a 100644 --- a/maple/src/main/java/com/twitter/maple/hbase/HBaseTapCollector.java +++ b/maple/src/main/java/com/twitter/maple/hbase/HBaseTapCollector.java @@ -42,7 +42,7 @@ public class HBaseTapCollector extends TupleEntrySchemeCollector hadoopFlowProcess; + private final FlowProcess hadoopFlowProcess; /** Field tap */ private final Tap tap; /** Field reporter */ @@ -58,7 +58,7 @@ public class HBaseTapCollector extends TupleEntrySchemeCollector flowProcess, Tap tap) throws IOException { + public HBaseTapCollector(FlowProcess flowProcess, Tap tap) throws IOException { super(flowProcess, tap.getScheme()); this.hadoopFlowProcess = flowProcess; this.tap = tap; diff --git a/maple/src/main/java/com/twitter/maple/tap/MemorySinkTap.java b/maple/src/main/java/com/twitter/maple/tap/MemorySinkTap.java index 6b71b08b5b..f7fc9e21d1 100644 --- a/maple/src/main/java/com/twitter/maple/tap/MemorySinkTap.java +++ b/maple/src/main/java/com/twitter/maple/tap/MemorySinkTap.java @@ -7,7 +7,7 @@ import cascading.tuple.Tuple; import cascading.tuple.TupleEntry; import cascading.tuple.TupleEntryIterator; -import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.conf.Configuration; import java.io.File; import java.io.IOException; @@ -43,7 +43,7 @@ public static String getTempDir() { } @Override - public boolean commitResource(JobConf conf) throws java.io.IOException { + public boolean commitResource(Configuration conf) throws java.io.IOException { TupleEntryIterator it = new HadoopFlowProcess(conf).openTapForRead(this); boolean first_time = true; diff --git a/maple/src/main/java/com/twitter/maple/tap/MemorySourceTap.java b/maple/src/main/java/com/twitter/maple/tap/MemorySourceTap.java index 1d07de3a23..fdff677ff1 100644 --- a/maple/src/main/java/com/twitter/maple/tap/MemorySourceTap.java +++ b/maple/src/main/java/com/twitter/maple/tap/MemorySourceTap.java @@ -46,7 +46,7 @@ public List getTuples() { } @Override - public void sourceConfInit(FlowProcess flowProcess, + public void sourceConfInit(FlowProcess flowProcess, Tap, Void> tap, JobConf conf) { FileInputFormat.setInputPaths(conf, this.id); conf.setInputFormat(TupleMemoryInputFormat.class); @@ -54,13 +54,13 @@ public void sourceConfInit(FlowProcess flowProcess, } @Override - public void sinkConfInit(FlowProcess flowProcess, + public void sinkConfInit(FlowProcess flowProcess, Tap, Void> tap, JobConf conf) { throw new UnsupportedOperationException("Not supported yet."); } @Override - public void sourcePrepare( FlowProcess flowProcess, SourceCall flowProcess, SourceCall> sourceCall ) { sourceCall.setContext( new Object[ 2 ] ); @@ -69,7 +69,7 @@ public void sourcePrepare( FlowProcess flowProcess, SourceCall flowProcess, SourceCall flowProcess, SourceCall> sourceCall) throws IOException { TupleWrapper key = (TupleWrapper) sourceCall.getContext()[ 0 ]; NullWritable value = (NullWritable) sourceCall.getContext()[ 1 ]; @@ -84,13 +84,13 @@ public boolean source(FlowProcess flowProcess, SourceCall flowProcess, SourceCall flowProcess, SourceCall> sourceCall ) { sourceCall.setContext( null ); } @Override - public void sink(FlowProcess flowProcess, SinkCall sinkCall ) throws IOException { + public void sink(FlowProcess flowProcess, SinkCall sinkCall ) throws IOException { throw new UnsupportedOperationException("Not supported."); } @@ -127,7 +127,7 @@ public boolean equals(Object object) { } @Override - public TupleEntryIterator openForRead( FlowProcess flowProcess, RecordReader flowProcess, RecordReader input ) throws IOException { // input may be null when this method is called on the client side or cluster side when accumulating // for a HashJoin diff --git a/maple/src/main/java/com/twitter/maple/tap/StdoutTap.java b/maple/src/main/java/com/twitter/maple/tap/StdoutTap.java index 5c3f5f0b29..93c8d4b501 100644 --- a/maple/src/main/java/com/twitter/maple/tap/StdoutTap.java +++ b/maple/src/main/java/com/twitter/maple/tap/StdoutTap.java @@ -5,7 +5,7 @@ import cascading.tap.hadoop.Lfs; import cascading.tuple.Fields; import cascading.tuple.TupleEntryIterator; -import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.conf.Configuration; import java.io.File; import java.io.IOException; @@ -32,7 +32,7 @@ public static String getTempDir() { } @Override - public boolean commitResource(JobConf conf) throws java.io.IOException { + public boolean commitResource(Configuration conf) throws java.io.IOException { TupleEntryIterator it = new HadoopFlowProcess(conf).openTapForRead(this); System.out.println(""); System.out.println(""); @@ -45,4 +45,4 @@ public boolean commitResource(JobConf conf) throws java.io.IOException { it.close(); return true; } -} \ No newline at end of file +} diff --git a/project/plugins.sbt b/project/plugins.sbt index 5dacf1329c..be4e74c664 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -3,7 +3,7 @@ resolvers += Resolver.url("artifactory", url("http://scalasbt.artifactoryonline. resolvers ++= Seq( "jgit-repo" at "http://download.eclipse.org/jgit/maven", "sonatype-releases" at "https://oss.sonatype.org/content/repositories/releases", - "Twitter Maven" at "http://maven.twttr.com" + "Twitter Maven" at "https://maven.twttr.com" ) addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.10.2") diff --git a/scalding-commons/src/main/java/com/twitter/scalding/commons/scheme/KeyValueByteScheme.java b/scalding-commons/src/main/java/com/twitter/scalding/commons/scheme/KeyValueByteScheme.java index a436c75e6d..1ca84861c4 100644 --- a/scalding-commons/src/main/java/com/twitter/scalding/commons/scheme/KeyValueByteScheme.java +++ b/scalding-commons/src/main/java/com/twitter/scalding/commons/scheme/KeyValueByteScheme.java @@ -3,24 +3,25 @@ import java.io.IOException; import java.util.Arrays; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.SequenceFileInputFormat; import cascading.flow.FlowProcess; import cascading.scheme.SinkCall; import cascading.scheme.SourceCall; +import cascading.scheme.hadoop.WritableSequenceFile; +import cascading.tap.Tap; import cascading.tuple.Fields; import cascading.tuple.Tuple; import cascading.tuple.TupleEntry; -import com.twitter.elephantbird.cascading2.scheme.CombinedWritableSequenceFile; - /** - * + * Used in conjunction with VersionedKeyValSource. */ -public class KeyValueByteScheme extends CombinedWritableSequenceFile { +public class KeyValueByteScheme extends WritableSequenceFile { public KeyValueByteScheme(Fields fields) { super(fields, BytesWritable.class, BytesWritable.class); } @@ -30,7 +31,15 @@ public static byte[] getBytes(BytesWritable key) { } @Override - public boolean source(FlowProcess flowProcess, + public void sourceConfInit(FlowProcess flowProcess, + Tap tap, Configuration conf) { + super.sourceConfInit(flowProcess, tap, conf); + conf.setClass("mapred.input.format.class", SequenceFileInputFormat.class, + org.apache.hadoop.mapred.InputFormat.class); + } + + @Override + public boolean source(FlowProcess flowProcess, SourceCall sourceCall) throws IOException { BytesWritable key = (BytesWritable) sourceCall.getContext()[0]; BytesWritable value = (BytesWritable) sourceCall.getContext()[1]; @@ -48,7 +57,7 @@ public boolean source(FlowProcess flowProcess, } @Override - public void sink(FlowProcess flowProcess, SinkCall sinkCall) + public void sink(FlowProcess flowProcess, SinkCall sinkCall) throws IOException { TupleEntry tupleEntry = sinkCall.getOutgoingEntry(); diff --git a/scalding-commons/src/main/java/com/twitter/scalding/commons/tap/VersionedTap.java b/scalding-commons/src/main/java/com/twitter/scalding/commons/tap/VersionedTap.java index 93a014ed19..8dd0241ec9 100644 --- a/scalding-commons/src/main/java/com/twitter/scalding/commons/tap/VersionedTap.java +++ b/scalding-commons/src/main/java/com/twitter/scalding/commons/tap/VersionedTap.java @@ -5,17 +5,18 @@ import com.twitter.scalding.commons.datastores.VersionedStore; import com.twitter.scalding.tap.GlobHfs; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.RecordReader; import cascading.flow.FlowProcess; +import cascading.flow.hadoop.util.HadoopUtil; import cascading.scheme.Scheme; +import static org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR; + public class VersionedTap extends GlobHfs { public static enum TapMode {SOURCE, SINK} @@ -30,7 +31,7 @@ public static enum TapMode {SOURCE, SINK} // sink-specific private String newVersionPath; - public VersionedTap(String dir, Scheme scheme, TapMode mode) + public VersionedTap(String dir, Scheme scheme, TapMode mode) throws IOException { super(scheme, dir); this.mode = mode; @@ -59,11 +60,11 @@ public String getOutputDirectory() { return getPath().toString(); } - public VersionedStore getStore(JobConf conf) throws IOException { + public VersionedStore getStore(Configuration conf) throws IOException { return new VersionedStore(getPath().getFileSystem(conf), getOutputDirectory()); } - public String getSourcePath(JobConf conf) { + public String getSourcePath(Configuration conf) { VersionedStore store; try { store = getStore(conf); @@ -77,7 +78,7 @@ public String getSourcePath(JobConf conf) { } } - public String getSinkPath(JobConf conf) { + public String getSinkPath(Configuration conf) { try { VersionedStore store = getStore(conf); String sinkPath = (version == null) ? store.createVersion() : store.createVersion(version); @@ -91,38 +92,42 @@ public String getSinkPath(JobConf conf) { } @Override - public void sourceConfInit(FlowProcess process, JobConf conf) { + public void sourceConfInit(FlowProcess process, Configuration conf) { super.sourceConfInit(process, conf); - FileInputFormat.setInputPaths(conf, getSourcePath(conf)); + conf.unset("mapred.input.dir"); // need this to unset any paths set in super.sourceConfInit + conf.unset(INPUT_DIR); // need this to unset any paths set in super.sourceConfInit + Path fullyQualifiedPath = getFileSystem(conf).makeQualified(new Path(getSourcePath(conf))); + HadoopUtil.addInputPath(conf, fullyQualifiedPath); } @Override - public void sinkConfInit(FlowProcess process, JobConf conf) { + public void sinkConfInit(FlowProcess process, Configuration conf) { super.sinkConfInit(process, conf); if (newVersionPath == null) newVersionPath = getSinkPath(conf); - FileOutputFormat.setOutputPath(conf, new Path(newVersionPath)); + Path fullyQualifiedPath = getFileSystem(conf).makeQualified(new Path(newVersionPath)); + HadoopUtil.setOutputPath(conf, fullyQualifiedPath); } @Override - public long getSize(JobConf conf) throws IOException { + public long getSize(Configuration conf) throws IOException { return getSize(new Path(getSourcePath(conf)), conf); } @Override - public boolean resourceExists(JobConf jc) throws IOException { + public boolean resourceExists(Configuration jc) throws IOException { return getStore(jc).mostRecentVersion() != null; } @Override - public boolean createResource(JobConf jc) throws IOException { + public boolean createResource(Configuration jc) throws IOException { throw new UnsupportedOperationException("Not supported yet."); } @Override - public boolean deleteResource(JobConf jc) throws IOException { + public boolean deleteResource(Configuration jc) throws IOException { throw new UnsupportedOperationException("Not supported yet."); } @@ -136,13 +141,13 @@ public String getIdentifier() { } @Override - public long getModifiedTime(JobConf conf) throws IOException { + public long getModifiedTime(Configuration conf) throws IOException { VersionedStore store = getStore(conf); return (mode == TapMode.SINK) ? 0 : store.mostRecentVersion(); } @Override - public boolean commitResource(JobConf conf) throws IOException { + public boolean commitResource(Configuration conf) throws IOException { VersionedStore store = getStore(conf); if (newVersionPath != null) { @@ -155,7 +160,7 @@ public boolean commitResource(JobConf conf) throws IOException { return true; } - private static void markSuccessfulOutputDir(Path path, JobConf conf) throws IOException { + private static void markSuccessfulOutputDir(Path path, Configuration conf) throws IOException { FileSystem fs = path.getFileSystem(conf); // create a file in the folder to mark it if (fs.exists(path)) { @@ -165,7 +170,7 @@ private static void markSuccessfulOutputDir(Path path, JobConf conf) throws IOEx } @Override - public boolean rollbackResource(JobConf conf) throws IOException { + public boolean rollbackResource(Configuration conf) throws IOException { if (newVersionPath != null) { getStore(conf).failVersion(newVersionPath); newVersionPath = null; diff --git a/scalding-commons/src/main/scala/com/twitter/scalding/commons/scheme/CombinedSequenceFileScheme.scala b/scalding-commons/src/main/scala/com/twitter/scalding/commons/scheme/CombinedSequenceFileScheme.scala deleted file mode 100644 index a987ca0e11..0000000000 --- a/scalding-commons/src/main/scala/com/twitter/scalding/commons/scheme/CombinedSequenceFileScheme.scala +++ /dev/null @@ -1,16 +0,0 @@ -package com.twitter.scalding.commons.scheme - -import cascading.scheme.Scheme -import com.twitter.elephantbird.cascading2.scheme.{ CombinedSequenceFile, CombinedWritableSequenceFile } -import com.twitter.scalding.{ HadoopSchemeInstance, SequenceFileScheme, WritableSequenceFileScheme } - -trait CombinedSequenceFileScheme extends SequenceFileScheme { - // TODO Cascading doesn't support local mode yet - override def hdfsScheme = HadoopSchemeInstance(new CombinedSequenceFile(fields).asInstanceOf[Scheme[_, _, _, _, _]]) -} - -trait CombinedWritableSequenceFileScheme extends WritableSequenceFileScheme { - // TODO Cascading doesn't support local mode yet - override def hdfsScheme = - HadoopSchemeInstance(new CombinedWritableSequenceFile(fields, keyType, valueType).asInstanceOf[Scheme[_, _, _, _, _]]) -} \ No newline at end of file diff --git a/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/LzoGenericScheme.scala b/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/LzoGenericScheme.scala index 09ad893905..9d5c1eb1d3 100644 --- a/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/LzoGenericScheme.scala +++ b/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/LzoGenericScheme.scala @@ -20,7 +20,7 @@ import scala.reflect.ClassTag import com.twitter.bijection._ import com.twitter.chill.Externalizer -import com.twitter.elephantbird.cascading2.scheme.LzoBinaryScheme +import com.twitter.elephantbird.cascading3.scheme.LzoBinaryScheme import com.twitter.elephantbird.mapreduce.input.combine.DelegateCombineFileInputFormat import com.twitter.elephantbird.mapreduce.io.{ BinaryConverter, GenericWritable } import com.twitter.elephantbird.mapreduce.input.{ BinaryConverterProvider, MultiInputFormat } @@ -97,7 +97,7 @@ object LzoGenericScheme { /** * From a Binary Converter passed in configure in the JobConf using of that by ElephantBird */ - def setConverter[M](conv: BinaryConverter[M], conf: JobConf, confKey: String, overrideConf: Boolean = false): Unit = { + def setConverter[M](conv: BinaryConverter[M], conf: Configuration, confKey: String, overrideConf: Boolean = false): Unit = { if ((conf.get(confKey) == null) || overrideConf) { val extern = Externalizer(conv) try { @@ -120,9 +120,9 @@ class LzoGenericScheme[M](@transient conv: BinaryConverter[M], clazz: Class[M]) override protected def prepareBinaryWritable(): GenericWritable[M] = new GenericWritable(conv) - override def sourceConfInit(fp: FlowProcess[JobConf], - tap: Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]], - conf: JobConf): Unit = { + override def sourceConfInit(fp: FlowProcess[_ <: Configuration], + tap: Tap[Configuration, RecordReader[_, _], OutputCollector[_, _]], + conf: Configuration): Unit = { LzoGenericScheme.setConverter(conv, conf, SourceConfigBinaryConverterProvider.ProviderConfKey) MultiInputFormat.setClassConf(clazz, conf) @@ -131,9 +131,9 @@ class LzoGenericScheme[M](@transient conv: BinaryConverter[M], clazz: Class[M]) DelegateCombineFileInputFormat.setDelegateInputFormat(conf, classOf[MultiInputFormat[_]]) } - override def sinkConfInit(fp: FlowProcess[JobConf], - tap: Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]], - conf: JobConf): Unit = { + override def sinkConfInit(fp: FlowProcess[_ <: Configuration], + tap: Tap[Configuration, RecordReader[_, _], OutputCollector[_, _]], + conf: Configuration): Unit = { LzoGenericScheme.setConverter(conv, conf, SinkConfigBinaryConverterProvider.ProviderConfKey) LzoGenericBlockOutputFormat.setClassConf(clazz, conf) LzoGenericBlockOutputFormat.setGenericConverterClassConf(classOf[SinkConfigBinaryConverterProvider[_]], conf) diff --git a/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/LzoTraits.scala b/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/LzoTraits.scala index eeb28fc929..def9bc1673 100644 --- a/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/LzoTraits.scala +++ b/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/LzoTraits.scala @@ -22,7 +22,7 @@ import cascading.scheme.Scheme import org.apache.thrift.TBase import com.google.protobuf.Message import com.twitter.bijection.Injection -import com.twitter.elephantbird.cascading2.scheme._ +import com.twitter.elephantbird.cascading3.scheme._ import com.twitter.scalding._ import com.twitter.scalding.Dsl._ import com.twitter.scalding.source.{ CheckedInversion, MaxFailuresCheck } diff --git a/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/LzoTypedText.scala b/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/LzoTypedText.scala index 089968b9a4..83bc91907a 100644 --- a/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/LzoTypedText.scala +++ b/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/LzoTypedText.scala @@ -3,7 +3,7 @@ package com.twitter.scalding.commons.source import cascading.scheme.Scheme import cascading.scheme.hadoop.{ TextDelimited => CHTextDelimited } import cascading.scheme.local.{ TextDelimited => CLTextDelimited } -import com.twitter.elephantbird.cascading2.scheme.LzoTextDelimited +import com.twitter.elephantbird.cascading3.scheme.LzoTextDelimited import com.twitter.scalding._ import com.twitter.scalding.source.TypedTextDelimited import com.twitter.scalding.source.TypedSep diff --git a/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/VersionedKeyValSource.scala b/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/VersionedKeyValSource.scala index 0a5878c762..28a9b474a4 100644 --- a/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/VersionedKeyValSource.scala +++ b/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/VersionedKeyValSource.scala @@ -32,7 +32,7 @@ import com.twitter.scalding.commons.tap.VersionedTap.TapMode import com.twitter.scalding.source.{ CheckedInversion, MaxFailuresCheck } import com.twitter.scalding.typed.KeyedListLike import com.twitter.scalding.typed.TypedSink -import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.mapred.{ JobConf, OutputCollector, RecordReader } import scala.collection.JavaConverters._ /** @@ -69,7 +69,7 @@ class VersionedKeyValSource[K, V](val path: String, val sourceVersion: Option[Lo override def setter[U <: (K, V)] = TupleSetter.asSubSetter[(K, V), U](TupleSetter.of[(K, V)]) - def hdfsScheme = + def hdfsScheme: Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _] = HadoopSchemeInstance(new KeyValueByteScheme(fields).asInstanceOf[Scheme[_, _, _, _, _]]) @deprecated("This method is deprecated", "0.1.6") @@ -77,7 +77,7 @@ class VersionedKeyValSource[K, V](val path: String, val sourceVersion: Option[Lo this(path, sourceVersion, sinkVersion, maxFailures, VersionedKeyValSource.defaultVersionsToKeep)(codec) def getTap(mode: TapMode) = { - val tap = new VersionedTap(path, hdfsScheme, mode).setVersionsToKeep(versionsToKeep) + val tap = new VersionedTap(path, Hadoop2SchemeInstance(hdfsScheme), mode).setVersionsToKeep(versionsToKeep) (sourceVersion, sinkVersion) match { case (Some(v), _) if mode == TapMode.SOURCE => tap.setVersion(v) diff --git a/scalding-commons/src/test/scala/com/twitter/scalding/commons/VersionedKeyValSourceTest.scala b/scalding-commons/src/test/scala/com/twitter/scalding/commons/VersionedKeyValSourceTest.scala index 1961fe60ec..93d38ed42b 100644 --- a/scalding-commons/src/test/scala/com/twitter/scalding/commons/VersionedKeyValSourceTest.scala +++ b/scalding-commons/src/test/scala/com/twitter/scalding/commons/VersionedKeyValSourceTest.scala @@ -15,13 +15,18 @@ limitations under the License. */ package com.twitter.scalding.commons.source +import org.apache.hadoop.fs.Path import org.scalatest.{ Matchers, WordSpec } import com.twitter.scalding._ import com.twitter.scalding.commons.datastores.VersionedStore import com.twitter.bijection.Injection import com.google.common.io.Files -import org.apache.hadoop.mapred.JobConf -import java.io.{ File, FileWriter } +import java.io.FileWriter + +import org.apache.hadoop.mapred.{ JobConf, SequenceFileInputFormat } +import java.io.File + +import org.apache.hadoop.conf.Configuration // Use the scalacheck generators import scala.collection.mutable.Buffer @@ -36,6 +41,12 @@ class TypedWriteIncrementalJob(args: Args) extends Job(args) { .writeIncremental(VersionedKeyValSource[Int, Int]("output")) } +// Test version of SequenceFileInputFormat to get details on which +// paths it will use +class TestSequenceFileInputFormat extends SequenceFileInputFormat[Int, Int] { + def getPaths(conf: JobConf): Array[Path] = super.listStatus(conf).map(_.getPath) +} + class MoreComplexTypedWriteIncrementalJob(args: Args) extends Job(args) { import RichPipeEx._ val pipe = TypedPipe.from(TypedTsv[Int]("input")) @@ -134,7 +145,7 @@ class VersionedKeyValSourceTest extends WordSpec with Matchers { val keyValueSize = VersionedKeyValSource(path) .source - .getSize(new JobConf()) + .getSize(new Configuration()) contentSize should be (keyValueSize) } @@ -150,7 +161,7 @@ class VersionedKeyValSourceTest extends WordSpec with Matchers { versions foreach { v => val p = store.createVersion(v) new File(p).mkdirs() - + // create a part file here contentFn(v) .foreach { text => val content = new FileWriter(new File(p + "/test")) @@ -158,6 +169,7 @@ class VersionedKeyValSourceTest extends WordSpec with Matchers { content.close() } + // and succeed store.succeedVersion(p) } @@ -168,7 +180,27 @@ class VersionedKeyValSourceTest extends WordSpec with Matchers { * Creates a VersionedKeyValSource using the provided version * and then validates it. */ - private def validateVersion(path: String, version: Option[Long] = None) = - VersionedKeyValSource(path = path, sourceVersion = version) - .validateTaps(Hdfs(false, new JobConf())) + private def validateVersion(path: String, version: Option[Long] = None) = { + val store = VersionedKeyValSource(path = path, sourceVersion = version) + val conf: JobConf = new JobConf() + store.validateTaps(Hdfs(strict = false, conf)) + + // also validate the paths for the version + validateVersionPaths(path, version, store, conf) + } + + def validateVersionPaths(path: String, version: Option[Long], store: VersionedKeyValSource[_, _], conf: JobConf): Unit = { + store.source.sourceConfInit(null, conf) // this sets up the splits needed for input format + val fileInputFormat = new TestSequenceFileInputFormat() + val paths = fileInputFormat.getPaths(conf) + version match { + case Some(ver) => + // expect only the part file for the specified version + assert(paths.length == paths.count(_.toString.endsWith(ver + "/part-00000"))) + case None => + // when no version is specified, we get the most recent version's data + val mostRecentVersion = store.source.getStore(conf).mostRecentVersion() + assert(paths.length == paths.count(_.toString.endsWith(mostRecentVersion + "/part-00000"))) + } + } } diff --git a/scalding-core/src/main/java/com/twitter/scalding/tap/GlobHfs.java b/scalding-core/src/main/java/com/twitter/scalding/tap/GlobHfs.java index 70b5775a45..b5bbe32301 100644 --- a/scalding-core/src/main/java/com/twitter/scalding/tap/GlobHfs.java +++ b/scalding-core/src/main/java/com/twitter/scalding/tap/GlobHfs.java @@ -3,6 +3,7 @@ import java.io.FileNotFoundException; import java.io.IOException; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -18,16 +19,16 @@ * that will throw IOException where we actually can calculate size of source. */ public class GlobHfs extends Hfs { - public GlobHfs(Scheme scheme) { + public GlobHfs(Scheme scheme) { super(scheme); } - public GlobHfs(Scheme scheme, String stringPath) { + public GlobHfs(Scheme scheme, String stringPath) { super(scheme, stringPath); } @Override - public long getSize(JobConf conf) throws IOException { + public long getSize(Configuration conf) throws IOException { return getSize(getPath(), conf); } @@ -35,7 +36,7 @@ public long getSize(JobConf conf) throws IOException { * Get the total size of the file(s) specified by the Hfs, which may contain a glob * pattern in its path, so we must be ready to handle that case. */ - public static long getSize(Path path, JobConf conf) throws IOException { + public static long getSize(Path path, Configuration conf) throws IOException { FileSystem fs = path.getFileSystem(conf); FileStatus[] statuses = fs.globStatus(path); diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Config.scala b/scalding-core/src/main/scala/com/twitter/scalding/Config.scala index 3b1d3bc752..0274d24b6b 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Config.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Config.scala @@ -24,7 +24,7 @@ import com.twitter.chill.config.{ ScalaMapConfig, ConfiguredInstantiator } import com.twitter.bijection.{ Base64String, Injection } import com.twitter.scalding.filecache.{CachedFile, DistributedCacheFile, HadoopCachedFile} -import cascading.pipe.assembly.AggregateBy +import cascading.pipe.assembly.AggregateByProps import cascading.flow.{ FlowListener, FlowStepListener, FlowProps, FlowStepStrategy } import cascading.property.AppProps import cascading.tuple.collect.SpillableProps @@ -133,7 +133,7 @@ trait Config extends Serializable { * the best results */ def setMapSideAggregationThreshold(count: Int): Config = - this + (AggregateBy.AGGREGATE_BY_THRESHOLD -> count.toString) + this + (AggregateByProps.AGGREGATE_BY_CAPACITY -> count.toString) /** * Set this configuration option to require all grouping/cogrouping @@ -412,6 +412,9 @@ trait Config extends Serializable { object Config { val CascadingAppName: String = "cascading.app.name" val CascadingAppId: String = "cascading.app.id" + // This is the old config AGGREGATE_BY_THRESHOLD which is no longer present in cascading3 + // We maintain our own copy to provide backward compatibility + val CascadingAggregateByThreshold = "cascading.aggregateby.threshold" val CascadingSerializationTokens = "cascading.serialization.tokens" val IoSerializationsKey: String = "io.serializations" val ScaldingFlowClassName: String = "scalding.flow.class.name" diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala b/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala index b00d6ca338..e83badbcb9 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala @@ -957,7 +957,7 @@ object ExecutionCounters { * Just gets the counters from the CascadingStats and ignores * all the other fields present */ - def fromCascading(cs: cascading.stats.CascadingStats): ExecutionCounters = new ExecutionCounters { + def fromCascading(cs: cascading.stats.CascadingStats[_]): ExecutionCounters = new ExecutionCounters { import scala.collection.JavaConverters._ val keys = (for { diff --git a/scalding-core/src/main/scala/com/twitter/scalding/ExecutionContext.scala b/scalding-core/src/main/scala/com/twitter/scalding/ExecutionContext.scala index f551b80b86..2022613d65 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/ExecutionContext.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/ExecutionContext.scala @@ -153,10 +153,10 @@ object ExecutionContext { private val LOG: Logger = LoggerFactory.getLogger(ExecutionContext.getClass) private[scalding] def getDesc[T](baseFlowStep: BaseFlowStep[T]): Seq[String] = { - baseFlowStep.getGraph.vertexSet.asScala.flatMap { + baseFlowStep.getElementGraph.vertexSet.asScala.toSeq.flatMap(_ match { case pipe: Pipe => RichPipe.getPipeDescriptions(pipe) case _ => List() // no descriptions - }(collection.breakOut) + })(collection.breakOut) } /* * implicit val ec = ExecutionContext.newContext(config) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/FileSource.scala b/scalding-core/src/main/scala/com/twitter/scalding/FileSource.scala index 696a5e3877..a4cde45bcc 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/FileSource.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/FileSource.scala @@ -55,7 +55,9 @@ trait HfsTapProvider { def createHfsTap(scheme: Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _], path: String, sinkMode: SinkMode): Hfs = - new Hfs(scheme, path, sinkMode) + new Hfs( + Hadoop2SchemeInstance(scheme), + path, sinkMode) } private[scalding] object CastFileTap { @@ -435,7 +437,7 @@ trait SuccessFileSource extends FileSource { trait LocalTapSource extends LocalSourceOverride { override def createLocalTap(sinkMode: SinkMode): Tap[JobConf, _, _] = { val taps = localPaths.map { p => - new LocalTap(p, hdfsScheme, sinkMode).asInstanceOf[Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]]] + new LocalTap(p, Hadoop2SchemeInstance(hdfsScheme), sinkMode).asInstanceOf[Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]]] }.toSeq taps match { diff --git a/scalding-core/src/main/scala/com/twitter/scalding/HfsConfPropertySetter.scala b/scalding-core/src/main/scala/com/twitter/scalding/HfsConfPropertySetter.scala index af34965333..f3fa41f7e6 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/HfsConfPropertySetter.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/HfsConfPropertySetter.scala @@ -15,21 +15,22 @@ limitations under the License. */ package com.twitter.scalding +import cascading.flow.FlowProcess +import cascading.scheme.Scheme import cascading.tap.hadoop.Hfs import cascading.tap.SinkMode +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.JobConf -import cascading.flow.FlowProcess import org.apache.hadoop.mapred.RecordReader import org.apache.hadoop.mapred.OutputCollector -import cascading.scheme.Scheme private[scalding] class ConfPropertiesHfsTap( sourceConfig: Config, sinkConfig: Config, - scheme: Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _], + scheme: Scheme[Configuration, RecordReader[_, _], OutputCollector[_, _], _, _], stringPath: String, sinkMode: SinkMode) extends Hfs(scheme, stringPath, sinkMode) { - override def sourceConfInit(process: FlowProcess[JobConf], conf: JobConf): Unit = { + override def sourceConfInit(process: FlowProcess[_ <: Configuration], conf: Configuration): Unit = { sourceConfig.toMap.foreach { case (k, v) => conf.set(k, v) @@ -37,7 +38,7 @@ private[scalding] class ConfPropertiesHfsTap( super.sourceConfInit(process, conf) } - override def sinkConfInit(process: FlowProcess[JobConf], conf: JobConf): Unit = { + override def sinkConfInit(process: FlowProcess[_ <: Configuration], conf: Configuration): Unit = { sinkConfig.toMap.foreach { case (k, v) => conf.set(k, v) @@ -70,6 +71,6 @@ trait HfsConfPropertySetter extends HfsTapProvider { } else { (sourceConfig, sinkConfig) } - new ConfPropertiesHfsTap(srcCfg, sinkCfg, scheme, path, sinkMode) + new ConfPropertiesHfsTap(srcCfg, sinkCfg, Hadoop2SchemeInstance(scheme), path, sinkMode) } -} +} \ No newline at end of file diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Job.scala b/scalding-core/src/main/scala/com/twitter/scalding/Job.scala index b90c4346ac..aa39842a75 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Job.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Job.scala @@ -260,7 +260,7 @@ class Job(val args: Args) extends FieldConversions with java.io.Serializable { FlowStateMap.clear(flowDef) } - protected def handleStats(statsData: CascadingStats): Unit = { + protected def handleStats(statsData: CascadingStats[_]): Unit = { scaldingCascadingStats = Some(statsData) // TODO: Why the two ways to do stats? Answer: jank-den. if (args.boolean("scalding.flowstats")) { @@ -271,7 +271,7 @@ class Job(val args: Args) extends FieldConversions with java.io.Serializable { } // Print custom counters unless --scalding.nocounters is used or there are no custom stats if (!args.boolean("scalding.nocounters")) { - implicit val statProvider: CascadingStats = statsData + implicit val statProvider = statsData val jobStats = Stats.getAllCustomCounters if (!jobStats.isEmpty) { println("Dumping custom counters:") @@ -287,7 +287,7 @@ class Job(val args: Args) extends FieldConversions with java.io.Serializable { // This awful name is designed to avoid collision // with subclasses @transient - private[scalding] var scaldingCascadingStats: Option[CascadingStats] = None + private[scalding] var scaldingCascadingStats: Option[CascadingStats[_]] = None /** * Save the Flow object after a run to allow clients to inspect the job. diff --git a/scalding-core/src/main/scala/com/twitter/scalding/JobStats.scala b/scalding-core/src/main/scala/com/twitter/scalding/JobStats.scala index 299012a4cc..4b857611a8 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/JobStats.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/JobStats.scala @@ -22,7 +22,7 @@ import scala.util.{ Failure, Try } object JobStats { def empty: JobStats = new JobStats(Map("counters" -> Map.empty)) - def apply(stats: CascadingStats): JobStats = { + def apply(stats: CascadingStats[_]): JobStats = { val m: Map[String, Any] = statsMap(stats) new JobStats( stats match { @@ -31,14 +31,14 @@ object JobStats { }) } - private def counterMap(stats: CascadingStats): Map[String, Map[String, Long]] = + private def counterMap(stats: CascadingStats[_]): Map[String, Map[String, Long]] = stats.getCounterGroups.asScala.map { group => (group, stats.getCountersFor(group).asScala.map { counter => (counter, stats.getCounterValue(group, counter)) }.toMap) }.toMap - private def statsMap(stats: CascadingStats): Map[String, Any] = + private def statsMap(stats: CascadingStats[_]): Map[String, Any] = Map( "counters" -> counterMap(stats), "duration" -> stats.getDuration, diff --git a/scalding-core/src/main/scala/com/twitter/scalding/JobTest.scala b/scalding-core/src/main/scala/com/twitter/scalding/JobTest.scala index ffc807a829..e83b5b9c70 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/JobTest.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/JobTest.scala @@ -63,7 +63,7 @@ object CascadeTest { class JobTest(cons: (Args) => Job) { private var argsMap = Map[String, List[String]]() private val callbacks = Buffer[() => Unit]() - private val statsCallbacks = Buffer[(CascadingStats) => Unit]() + private val statsCallbacks = Buffer[(CascadingStats[_]) => Unit]() // TODO: Switch the following maps and sets from Source to String keys // to guard for scala equality bugs private var sourceMap: (Source) => Option[Buffer[Tuple]] = { _ => None } @@ -131,13 +131,13 @@ class JobTest(cons: (Args) => Job) { // If this test is checking for multiple jobs chained by next, this only checks // for the counters in the final job's FlowStat. def counter(counter: String, group: String = Stats.ScaldingGroup)(op: Long => Unit) = { - statsCallbacks += ((stats: CascadingStats) => op(Stats.getCounterValue(counter, group)(stats))) + statsCallbacks += ((stats: CascadingStats[_]) => op(Stats.getCounterValue(counter, group)(stats))) this } // Used to check an assertion on all custom counters of a given scalding job. def counters(op: Map[String, Long] => Unit) = { - statsCallbacks += ((stats: CascadingStats) => op(Stats.getAllCustomCounters()(stats))) + statsCallbacks += ((stats: CascadingStats[_]) => op(Stats.getAllCustomCounters()(stats))) this } diff --git a/scalding-core/src/main/scala/com/twitter/scalding/MemoryTap.scala b/scalding-core/src/main/scala/com/twitter/scalding/MemoryTap.scala index 248c4ae46e..6b69b40986 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/MemoryTap.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/MemoryTap.scala @@ -44,11 +44,11 @@ class MemoryTap[In, Out](val scheme: Scheme[Properties, In, Out, _, _], val tupl override def getModifiedTime(conf: Properties) = if (resourceExists(conf)) modifiedTime else 0L override lazy val getIdentifier: String = scala.math.random.toString - override def openForRead(flowProcess: FlowProcess[Properties], input: In) = { + override def openForRead(flowProcess: FlowProcess[_ <: Properties], input: In) = { new TupleEntryChainIterator(scheme.getSourceFields, tupleBuffer.toIterator.asJava) } - override def openForWrite(flowProcess: FlowProcess[Properties], output: Out): TupleEntryCollector = { + override def openForWrite(flowProcess: FlowProcess[_ <: Properties], output: Out): TupleEntryCollector = { tupleBuffer.clear() new MemoryTupleEntryCollector(tupleBuffer, this) } diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala b/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala index c7425471e4..52829654a5 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala @@ -18,7 +18,7 @@ package com.twitter.scalding { import cascading.operation._ import cascading.tuple._ import cascading.flow._ - import cascading.pipe.assembly.AggregateBy + import cascading.pipe.assembly.{ AggregateBy, AggregateByProps } import com.twitter.chill.MeatLocker import scala.collection.JavaConverters._ @@ -286,15 +286,25 @@ package com.twitter.scalding { } object MapsideCache { - val DEFAULT_CACHE_SIZE = 100000 - val SIZE_CONFIG_KEY = AggregateBy.AGGREGATE_BY_THRESHOLD val ADAPTIVE_CACHE_KEY = "scalding.mapsidecache.adaptive" - - private def getCacheSize(fp: FlowProcess[_]): Int = - Option(fp.getStringProperty(SIZE_CONFIG_KEY)) - .filterNot { _.isEmpty } - .map { _.toInt } - .getOrElse(DEFAULT_CACHE_SIZE) + val DEFAULT_CACHE_SIZE = 100000 + val CASCADING2_SIZE_CONFIG_KEY = Config.CascadingAggregateByThreshold + val CASCADING3_SIZE_CONFIG_KEY = AggregateByProps.AGGREGATE_BY_CAPACITY + + def getCacheSize(fp: FlowProcess[_]): Int = { + def getInt(k: String): Option[Int] = Option(fp.getStringProperty(k)).filterNot(_.isEmpty).map(_.toInt) + val cascading2Property = getInt(CASCADING2_SIZE_CONFIG_KEY) + val cascading3Property = getInt(CASCADING3_SIZE_CONFIG_KEY) + // we support both old and new properties for backward compatibility + // and pick the max of the two, when both exist + val sizeFromProperty = (cascading2Property, cascading3Property) match { + case (Some(a), Some(b)) => Some(Ordering[Int].max(a, b)) + case (None, None) => None + case (Some(a), _) => Some(a) + case (_, Some(b)) => Some(b) + } + sizeFromProperty.getOrElse(DEFAULT_CACHE_SIZE) + } def apply[K, V: Semigroup](cacheSize: Option[Int], flowProcess: FlowProcess[_]): MapsideCache[K, V] = { val size = cacheSize.getOrElse{ getCacheSize(flowProcess) } diff --git a/scalding-core/src/main/scala/com/twitter/scalding/PartitionSource.scala b/scalding-core/src/main/scala/com/twitter/scalding/PartitionSource.scala index 04eb29a433..7e3aec0a02 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/PartitionSource.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/PartitionSource.scala @@ -91,8 +91,7 @@ abstract class PartitionSource(val openWritesThreshold: Option[Int] = None) exte /** * An implementation of TSV output, split over a partition tap. * - * Similar to TemplateSource, but with addition of tsvFields, to - * let users explicitly specify which fields they want to see in + * tsvFields lets users explicitly specify which fields they want to see in * the TSV (allows user to discard path fields). * * apply assumes user wants a DelimitedPartition (the only diff --git a/scalding-core/src/main/scala/com/twitter/scalding/RichPipe.scala b/scalding-core/src/main/scala/com/twitter/scalding/RichPipe.scala index 61e5077b9f..fc69d61ea2 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/RichPipe.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/RichPipe.scala @@ -128,6 +128,110 @@ object RichPipe extends java.io.Serializable { pipe.getParent == null && (pipe.getPrevious == null || pipe.getPrevious.isEmpty) && (!pipe.isInstanceOf[Splice]) + + /* + * If hashJoinPipe represents a hashjoin, this method checks if + * hashJoinOperandPipe is one of the two sides in that hashjoin. + */ + def isHashJoinedWithPipe(hashJoinPipe: Pipe, hashJoinOperandPipe: Pipe): Boolean = { + // collects all Eachs ending with a non-Each + @annotation.tailrec + def getChainOfEachs(p: Pipe, collect: List[Pipe]): Set[Pipe] = + p match { + case p if isSourcePipe(p) => + (p :: collect).toSet + case each: Each => + getChainOfEachs(each.getPrevious.head, each :: collect) + // we don't use a special Pipe subtype for the assignName method + // and we can't. all Pipe types need to be defined in cascading + // because cascading assumes it knows all the Pipe subtypes + // and fails to match any others (think of it as a sealed trait) + // So we handle all special types before checking for the assignName case + case other @ (hj: HashJoin) => + getJoinedPipeSet(hj) ++ collect + case other @ (_: Checkpoint | _: Operator | _: Splice /* except HashJoin*/ | _: SubAssembly) => + (other :: collect).toSet + case renamedPipe: Pipe => + // this is the assignName case + getChainOfEachs(renamedPipe.getPrevious.head, renamedPipe :: collect) + } + + def getJoinedPipeSet(p: HashJoin): Set[Pipe] = + p.getPrevious match { + case a @ Array(_, _) => + // collect nodes up the left and right sides + a.flatMap { p => getChainOfEachs(p, Nil) }.toSet + case other => + throw new IllegalStateException(s"More than two sides found in cascading's HashJoin pipe: $other") + } + + hashJoinPipe match { + case hj: HashJoin => + getJoinedPipeSet(hj).intersect(getChainOfEachs(hashJoinOperandPipe, Nil)).nonEmpty + case m: Merge => + m.getPrevious // gets all merged pipes + .exists { p => isHashJoinedWithPipe(p, hashJoinOperandPipe) } + case e: Each => + getSinglePreviousPipe(e) + .exists { p => isHashJoinedWithPipe(p, hashJoinOperandPipe) } + case other => + false + } + } + + /** + * Special handling for cases where one side of the hashjoin is merged + * with the hashjoin result. Cascading no longer allows it (as of 3.0), + * so we insert checkpoints and/or intermediate merge stages as appropriate + * + * @param head the first pipe to be merged + * @param tail a list of other pipes to be merged within the first + * + * @return an updated list of pipes, which can safely be merged + */ + private[scalding] def mergeAvoidingHashes(head: Pipe, tail: List[Pipe]): Pipe = { + + // we make use of the fact that the pipe merge operation is not just associative but also commutative + val pipes = head :: tail + val (colliding, uncolliding) = pipes.partition(p => pipes.exists(o => (o != p) && isHashJoinedWithPipe(p, o))) + + val (innerColliding, innerUncolliding) = colliding.partition(p => + colliding.exists(o => (o != p) && (isHashJoinedWithPipe(p, o) || isHashJoinedWithPipe(o, p)))) + /* innerUncolliding pipes collide with some pipes in the uncolliding set, but don't collide with one another. + It is fine to lump them and merge them together before merging with the 'uncolliding' set. + + innerColliding pipes collide with one another and must each be checkpointed before use in the general merge. + */ + val safedInnerColliding = innerColliding.map(new Checkpoint(_)) + val safedInnerUncolliding = + if (innerUncolliding.isEmpty) Nil + else List(new Checkpoint(mergeAvoidingNameClashes(innerUncolliding.head, innerUncolliding.tail))) + + val reassembled = safedInnerColliding ::: safedInnerUncolliding ::: uncolliding + mergeAvoidingNameClashes(reassembled.head, reassembled.tail) + } + + /** + * Cascading Merge does not support having multiple incoming pipes with the same name. + * Selectively rename pipes to avoid naming conflicts. + * + * @param head the first pipe to be merged (or checkpointed) + * @param tail a list of other pipes to be merged within the first + * @return a Merge of input pipes with any name clashes removed, or the input pipe if there was only one + */ + private[scalding] def mergeAvoidingNameClashes(head: Pipe, tail: List[Pipe]): Pipe = tail match { + case Nil => head // avoid generating new Merge(pipes.head) + case _ => + val (result, buf) = tail.foldLeft((List[Pipe](head), Set[String](head.getName))) { + case ((result, names), p) => + if (names.contains(p.getName)) + (assignName(p) :: result, names) /* no need to add the new name to names: assignName is guaranteed unique + and never assigned again */ + else (p :: result, names + p.getName) + } + new Merge(result: _*) + } + } /** @@ -139,6 +243,7 @@ class RichPipe(val pipe: Pipe) extends java.io.Serializable with JoinAlgorithms // We need this for the implicits import Dsl._ import RichPipe.assignName + import RichPipe.isHashJoinedWithPipe /** * Rename the current pipe @@ -243,15 +348,22 @@ class RichPipe(val pipe: Pipe) extends java.io.Serializable with JoinAlgorithms /** * Merge or Concatenate several pipes together with this one: */ - def ++(that: Pipe): Pipe = { - if (this.pipe == that) { - // Cascading fails on self merge: - // solution by Jack Guo - new Merge(assignName(this.pipe), assignName(new Each(that, new Identity))) - } else { - new Merge(assignName(this.pipe), assignName(that)) + def ++(that: Pipe): Pipe = + (this.pipe, that) match { + case (a, b) if a == b => + // Cascading fails on self merge: + // solution by Jack Guo + new Merge(assignName(a), assignName(new Each(b, new Identity))) + // special handling for cases where one side of the hashjoin is merged + // with the hashjoin result. Cascading no longer allows it, + // so we add a checkpoint to the join result as a workaround + case (a, b) if isHashJoinedWithPipe(a, b) => + new Merge(assignName(new Checkpoint(a)), assignName(b)) + case (a, b) if isHashJoinedWithPipe(b, a) => + new Merge(assignName(a), assignName(new Checkpoint(b))) + case (a, b) => + new Merge(assignName(a), assignName(b)) } - } /** * Group all tuples down to one reducer. diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Source.scala b/scalding-core/src/main/scala/com/twitter/scalding/Source.scala index 35c9a30b0b..db93ee8162 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Source.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Source.scala @@ -28,6 +28,7 @@ import cascading.tuple.{ Fields, Tuple => CTuple, TupleEntry, TupleEntryCollecto import cascading.pipe.Pipe +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.InputFormat import org.apache.hadoop.mapred.InputSplit import org.apache.hadoop.mapred.JobConf @@ -66,7 +67,7 @@ class InvalidSourceTap(val e: Throwable) extends SourceTap[JobConf, RecordReader override def getModifiedTime(conf: JobConf): Long = 0L - override def openForRead(flow: FlowProcess[JobConf], input: RecordReader[_, _]): TupleEntryIterator = throw new InvalidSourceException("Encountered InvalidSourceTap!", e) + override def openForRead(flow: FlowProcess[_ <: JobConf], input: RecordReader[_, _]): TupleEntryIterator = throw new InvalidSourceException("Encountered InvalidSourceTap!", e) override def resourceExists(conf: JobConf): Boolean = false @@ -81,8 +82,10 @@ class InvalidSourceTap(val e: Throwable) extends SourceTap[JobConf, RecordReader // 4. source.validateTaps (throws InvalidSourceException) // In the worst case if the flow plan is misconfigured, // openForRead on mappers should fail when using this tap. - override def sourceConfInit(flow: FlowProcess[JobConf], conf: JobConf): Unit = { - conf.setInputFormat(classOf[InvalidInputFormat]) + override def sourceConfInit(flow: FlowProcess[_ <: JobConf], conf: JobConf): Unit = { + conf.setClass("mapred.input.format.class", + classOf[InvalidInputFormat], + classOf[InputFormat[_, _]]); super.sourceConfInit(flow, conf) } } @@ -114,6 +117,11 @@ object HadoopSchemeInstance { scheme.asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] } +object Hadoop2SchemeInstance { + def apply(scheme: Scheme[_, _, _, _, _]) = + scheme.asInstanceOf[Scheme[Configuration, RecordReader[_, _], OutputCollector[_, _], _, _]] +} + object CastHfsTap { // The scala compiler has problems with the generics in Cascading def apply(tap: Hfs): Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]] = @@ -288,7 +296,7 @@ class NullTap[Config, Input, Output, SourceContext, SinkContext] SinkMode.UPDATE) { def getIdentifier = "nullTap" - def openForWrite(flowProcess: FlowProcess[Config], output: Output) = + def openForWrite(flowProcess: FlowProcess[_ <: Config], output: Output) = new TupleEntryCollector { override def add(te: TupleEntry): Unit = () override def add(t: CTuple): Unit = () diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Stats.scala b/scalding-core/src/main/scala/com/twitter/scalding/Stats.scala index 44a12d435f..6893e9533c 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Stats.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Stats.scala @@ -85,11 +85,11 @@ object Stats { // When getting a counter value, cascadeStats takes precedence (if set) and // flowStats is used after that. Returns None if neither is defined. - def getCounterValue(key: StatKey)(implicit cascadingStats: CascadingStats): Long = + def getCounterValue(key: StatKey)(implicit cascadingStats: CascadingStats[_]): Long = cascadingStats.getCounterValue(key.group, key.counter) // Returns a map of all custom counter names and their counts. - def getAllCustomCounters()(implicit cascadingStats: CascadingStats): Map[String, Long] = + def getAllCustomCounters()(implicit cascadingStats: CascadingStats[_]): Map[String, Long] = cascadingStats.getCountersFor(ScaldingGroup) .asScala .map { counter => diff --git a/scalding-core/src/main/scala/com/twitter/scalding/TemplateSource.scala b/scalding-core/src/main/scala/com/twitter/scalding/TemplateSource.scala deleted file mode 100644 index 00c74feaa8..0000000000 --- a/scalding-core/src/main/scala/com/twitter/scalding/TemplateSource.scala +++ /dev/null @@ -1,120 +0,0 @@ -/* -Copyright 2013 Inkling, Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ -package com.twitter.scalding - -import cascading.tap.hadoop.Hfs -import cascading.tap.hadoop.{ TemplateTap => HTemplateTap } -import cascading.tap.local.FileTap -import cascading.tap.local.{ TemplateTap => LTemplateTap } -import cascading.tap.SinkMode -import cascading.tap.Tap -import cascading.tuple.Fields - -/** - * This is a base class for template based output sources - */ -abstract class TemplateSource extends SchemedSource with HfsTapProvider { - - // The root path of the templated output. - def basePath: String - // The template as a java Formatter string. e.g. %s/%s for a two part template. - def template: String - // The fields to apply to the template. - def pathFields: Fields = Fields.ALL - - /** - * Creates the template tap. - * - * @param readOrWrite Describes if this source is being read from or written to. - * @param mode The mode of the job. (implicit) - * - * @return A cascading TemplateTap. - */ - override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { - readOrWrite match { - case Read => throw new InvalidSourceException("Cannot use TemplateSource for input") - case Write => { - mode match { - case Local(_) => { - val localTap = new FileTap(localScheme, basePath, sinkMode) - new LTemplateTap(localTap, template, pathFields) - } - case hdfsMode @ Hdfs(_, _) => { - val hfsTap = createHfsTap(hdfsScheme, basePath, sinkMode) - new HTemplateTap(hfsTap, template, pathFields) - } - case hdfsTest @ HadoopTest(_, _) => { - val hfsTap = createHfsTap(hdfsScheme, hdfsTest.getWritePathFor(this), sinkMode) - new HTemplateTap(hfsTap, template, pathFields) - } - case _ => TestTapFactory(this, hdfsScheme).createTap(readOrWrite) - } - } - } - } - - /** - * Validates the taps, makes sure there are no nulls as the path or template. - * - * @param mode The mode of the job. - */ - override def validateTaps(mode: Mode): Unit = { - if (basePath == null) { - throw new InvalidSourceException("basePath cannot be null for TemplateTap") - } else if (template == null) { - throw new InvalidSourceException("template cannot be null for TemplateTap") - } - } -} - -/** - * An implementation of TSV output, split over a template tap. - * - * @param basePath The root path for the output. - * @param template The java formatter style string to use as the template. e.g. %s/%s. - * @param pathFields The set of fields to apply to the path. - * @param writeHeader Flag to indicate that the header should be written to the file. - * @param sinkMode How to handle conflicts with existing output. - * @param fields The set of fields to apply to the output. - */ -case class TemplatedTsv( - override val basePath: String, - override val template: String, - override val pathFields: Fields = Fields.ALL, - override val writeHeader: Boolean = false, - override val sinkMode: SinkMode = SinkMode.REPLACE, - override val fields: Fields = Fields.ALL) - extends TemplateSource with DelimitedScheme - -/** - * An implementation of SequenceFile output, split over a template tap. - * - * @param basePath The root path for the output. - * @param template The java formatter style string to use as the template. e.g. %s/%s. - * @param sequenceFields The set of fields to use for the sequence file. - * @param pathFields The set of fields to apply to the path. - * @param sinkMode How to handle conflicts with existing output. - */ -case class TemplatedSequenceFile( - override val basePath: String, - override val template: String, - val sequenceFields: Fields = Fields.ALL, - override val pathFields: Fields = Fields.ALL, - override val sinkMode: SinkMode = SinkMode.REPLACE) - extends TemplateSource with SequenceFileScheme { - - override val fields = sequenceFields -} diff --git a/scalding-core/src/main/scala/com/twitter/scalding/TestTapFactory.scala b/scalding-core/src/main/scala/com/twitter/scalding/TestTapFactory.scala index 75238144d7..bf044c78d0 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/TestTapFactory.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/TestTapFactory.scala @@ -25,6 +25,7 @@ import cascading.scheme.NullScheme import java.io.{ Serializable, InputStream, OutputStream } +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapred.OutputCollector import org.apache.hadoop.mapred.RecordReader @@ -50,7 +51,7 @@ object TestTapFactory extends Serializable { new TestTapFactory(src, sinkMode) { override def hdfsScheme = Some(scheme) } } -class TestTapFactory(src: Source, sinkMode: SinkMode) extends Serializable { +class TestTapFactory(src: Source, sinkMode: SinkMode) extends Serializable with HfsTapProvider { def sourceFields: Fields = hdfsScheme.map { _.getSourceFields }.getOrElse(sys.error("No sourceFields defined")) @@ -94,12 +95,12 @@ class TestTapFactory(src: Source, sinkMode: SinkMode) extends Serializable { val fields = sourceFields (new MemorySourceTap(buffer.toList.asJava, fields)).asInstanceOf[Tap[JobConf, _, _]] } else { - CastHfsTap(new Hfs(hdfsScheme.get, hdfsTest.getWritePathFor(src), sinkMode)) + CastHfsTap(createHfsTap(hdfsScheme.get, hdfsTest.getWritePathFor(src), sinkMode)) } } case Write => { val path = hdfsTest.getWritePathFor(src) - CastHfsTap(new Hfs(hdfsScheme.get, path, sinkMode)) + CastHfsTap(createHfsTap(hdfsScheme.get, path, sinkMode)) } } case _ => { diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Tool.scala b/scalding-core/src/main/scala/com/twitter/scalding/Tool.scala index e428a57fc0..e54fad92ba 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Tool.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Tool.scala @@ -17,10 +17,9 @@ package com.twitter.scalding import cascading.flow.hadoop.HadoopFlow import cascading.flow.planner.BaseFlowStep - import org.apache.hadoop.conf.Configured import org.apache.hadoop.mapred.JobConf -import org.apache.hadoop.util.{ GenericOptionsParser, Tool => HTool, ToolRunner } +import org.apache.hadoop.util.{ GenericOptionsParser, ToolRunner, Tool => HTool } import scala.annotation.tailrec import scala.collection.JavaConverters._ @@ -98,18 +97,17 @@ class Tool extends Configured with HTool { flow match { case hadoopFlow: HadoopFlow => val flowSteps = hadoopFlow.getFlowSteps.asScala - flowSteps.foreach(step => { - val baseFlowStep: BaseFlowStep[JobConf] = step.asInstanceOf[BaseFlowStep[JobConf]] - val descriptions = baseFlowStep.getConfig.get(Config.StepDescriptions, "") - if (!descriptions.isEmpty) { - val stepXofYData = """\(\d+/\d+\)""".r.findFirstIn(baseFlowStep.getName).getOrElse("") - // Reflection is only temporary. Latest cascading has setName public: https://github.com/cwensel/cascading/commit/487a6e9ef#diff-0feab84bc8832b2a39312dbd208e3e69L175 - // https://github.com/twitter/scalding/issues/1294 - val x = classOf[BaseFlowStep[JobConf]].getDeclaredMethod("setName", classOf[String]) - x.setAccessible(true) - x.invoke(step, "%s %s".format(stepXofYData, descriptions)) - } - }) + + flowSteps.foreach { + case baseFlowStep: BaseFlowStep[JobConf @unchecked] => + val descriptions = Option(baseFlowStep.getConfig.get(Config.StepDescriptions)) + val stepXofYData = """\(\d+/\d+\)""".r.findFirstIn(baseFlowStep.getName) + + (descriptions, stepXofYData) match { + case (Some(d), Some(s)) => baseFlowStep.setName(s"${s} ${d}") + case (_, _) => () // keep the existing name, don't apply the description + } + } case _ => // descriptions not yet supported in other modes } diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Tracing.scala b/scalding-core/src/main/scala/com/twitter/scalding/Tracing.scala index f70e81b3ae..76469a86ad 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Tracing.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Tracing.scala @@ -18,6 +18,7 @@ package com.twitter.scalding import java.lang.reflect.InvocationTargetException +import cascading.util.TraceUtil import org.slf4j.{ Logger, LoggerFactory => LogManager } /** @@ -34,10 +35,6 @@ import org.slf4j.{ Logger, LoggerFactory => LogManager } object Tracing { private val LOG: Logger = LogManager.getLogger(this.getClass) - // TODO: remove this once we no longer want backwards compatiblity - // with cascading versions pre 2.6 - private val traceUtilClassName = "cascading.util.TraceUtil" - /** * Put a barrier at com.twitter.scalding, but exclude things like Tool * that are common entry points for calling user code @@ -58,7 +55,7 @@ object Tracing { * tracing boundary. Normally not needed, but may be useful * after a call to unregister() */ - def register(regex: String = defaultRegex) = invokeStaticMethod(traceUtilClassName, "registerApiBoundary", regex) + def register(regex: String = defaultRegex) = TraceUtil.registerApiBoundary(regex) /** * Unregisters "com.twitter.scalding" as a Cascading @@ -68,27 +65,6 @@ object Tracing { * should normally not be called but can be useful in testing * the development of Scalding internals */ - def unregister(regex: String = defaultRegex) = invokeStaticMethod(traceUtilClassName, "unregisterApiBoundary", regex) + def unregister(regex: String = defaultRegex) = TraceUtil.unregisterApiBoundary(regex) - /** - * Use reflection to register/unregister tracing boundaries so that cascading versions prior to 2.6 can be used - * without completely breaking - */ - private def invokeStaticMethod(clazz: String, methodName: String, args: AnyRef*): Unit = { - try { - val argTypes = args map (_.getClass()) - Class.forName(clazz).getMethod(methodName, argTypes: _*).invoke(null, args: _*) - } catch { - case e @ (_: NoSuchMethodException | - _: SecurityException | - _: IllegalAccessException | - _: IllegalArgumentException | - _: InvocationTargetException | - _: NullPointerException | - _: ClassNotFoundException) => LOG.warn("There was an error initializing tracing. " + - "Tracing information in DocumentServices such as Driven may point to Scalding code instead of " + - "user code. The most likely cause is a mismatch in Cascading library version. Upgrading the " + - "Cascading library to at least 2.6 should fix this issue.The cause was [" + e + "]") - } - } } \ No newline at end of file diff --git a/scalding-core/src/main/scala/com/twitter/scalding/estimation/Common.scala b/scalding-core/src/main/scala/com/twitter/scalding/estimation/Common.scala index 0c8da2e65e..28c3836079 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/estimation/Common.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/estimation/Common.scala @@ -19,7 +19,7 @@ object Common { } def unrollTaps(step: FlowStep[JobConf]): Seq[Tap[_, _, _]] = - unrollTaps(step.getSources.asScala.toSeq) + unrollTaps(step.getFlowNodeGraph.getSourceTaps.asScala.toSeq) def inputSizes(step: FlowStep[JobConf]): Seq[(String, Long)] = { val conf = step.getConfig diff --git a/scalding-core/src/main/scala/com/twitter/scalding/estimation/memory/MemoryEstimatorStepStrategy.scala b/scalding-core/src/main/scala/com/twitter/scalding/estimation/memory/MemoryEstimatorStepStrategy.scala index 540775e7b9..6ce1af2c61 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/estimation/memory/MemoryEstimatorStepStrategy.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/estimation/memory/MemoryEstimatorStepStrategy.scala @@ -68,7 +68,11 @@ object MemoryEstimatorStepStrategy extends FlowStepStrategy[JobConf] { case Some(MemoryEstimate(_, Some(reduceMem))) => LOG.info(s"Overriding only reduce memory to: $reduceMem in Mb") setMemory(reduceMem, (Config.ReduceJavaOpts, Config.ReduceMemory), conf) - case _ => LOG.info("Memory estimators didn't calculate any value. Skipping setting memory overrides") + case _ => + LOG.info("Memory estimators didn't calculate any value. Skipping setting memory overrides") + // explicitly unset these as Cascading seems to set them to 1024M + conf.unset(Config.MapMemory) + conf.unset(Config.ReduceMemory) } } } diff --git a/scalding-core/src/main/scala/com/twitter/scalding/reducer_estimation/RatioBasedEstimator.scala b/scalding-core/src/main/scala/com/twitter/scalding/reducer_estimation/RatioBasedEstimator.scala index abf76278b2..71f6bbaad8 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/reducer_estimation/RatioBasedEstimator.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/reducer_estimation/RatioBasedEstimator.scala @@ -1,6 +1,6 @@ package com.twitter.scalding.reducer_estimation -import com.twitter.scalding.estimation.{Common, FlowStepHistory, FlowStrategyInfo} +import com.twitter.scalding.estimation.{ Common, FlowStepHistory, FlowStrategyInfo } import org.apache.hadoop.mapred.JobConf import org.slf4j.LoggerFactory @@ -41,8 +41,7 @@ abstract class RatioBasedEstimator extends ReducerHistoryEstimator { override protected def estimate( info: FlowStrategyInfo, conf: JobConf, - history: Seq[FlowStepHistory] - ): Option[Int] = { + history: Seq[FlowStepHistory]): Option[Int] = { val threshold = RatioBasedEstimator.getInputRatioThreshold(conf) val inputBytes = Common.totalInputSize(info.step) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/reducer_estimation/ReducerEstimatorStepStrategy.scala b/scalding-core/src/main/scala/com/twitter/scalding/reducer_estimation/ReducerEstimatorStepStrategy.scala index 61581587a0..5135ac5fd6 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/reducer_estimation/ReducerEstimatorStepStrategy.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/reducer_estimation/ReducerEstimatorStepStrategy.scala @@ -25,20 +25,19 @@ object ReducerEstimatorStepStrategy extends FlowStepStrategy[JobConf] { final override def apply( flow: Flow[JobConf], preds: JList[FlowStep[JobConf]], - step: FlowStep[JobConf] - ): Unit = { + step: FlowStep[JobConf]): Unit = { val conf = step.getConfig // for steps with reduce phase, mapred.reduce.tasks is set in the jobconf at this point // so we check that to determine if this is a map-only step. conf.getNumReduceTasks match { - case 0 => LOG.info(s"${ flow.getName } is a map-only step. Skipping reducer estimation.") + case 0 => LOG.info(s"${flow.getName} is a map-only step. Skipping reducer estimation.") case _ => if (skipReducerEstimation(step)) { LOG.info( s""" - |Flow step ${ step.getName } was configured with reducers - |set explicitly (${ Config.WithReducersSetExplicitly }=true) and the estimator - |explicit override turned off (${ Config.ReducerEstimatorOverride }=false). Skipping + |Flow step ${step.getName} was configured with reducers + |set explicitly (${Config.WithReducersSetExplicitly}=true) and the estimator + |explicit override turned off (${Config.ReducerEstimatorOverride}=false). Skipping |reducer estimation. """.stripMargin) } else { diff --git a/scalding-core/src/main/scala/com/twitter/scalding/reducer_estimation/RuntimeReducerEstimator.scala b/scalding-core/src/main/scala/com/twitter/scalding/reducer_estimation/RuntimeReducerEstimator.scala index c516434164..8f5963259d 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/reducer_estimation/RuntimeReducerEstimator.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/reducer_estimation/RuntimeReducerEstimator.scala @@ -105,16 +105,15 @@ trait BasicRuntimeReducerEstimator extends ReducerHistoryEstimator { override protected def estimate( info: FlowStrategyInfo, conf: JobConf, - history: Seq[FlowStepHistory] - ): Option[Int] = { + history: Seq[FlowStepHistory]): Option[Int] = { val reduceTimes: Seq[Seq[Double]] = getReduceTimes(history) LOG.info( s"""| |History items have the following numbers of tasks: - | ${ history.map(_.tasks.length) }, + | ${history.map(_.tasks.length)}, |and the following numbers of tasks have valid task histories: - | ${ reduceTimes.map(_.length) }""".stripMargin) + | ${reduceTimes.map(_.length)}""".stripMargin) // total time taken in the step = time per reducer * number of reducers val jobTimes: Seq[Option[Double]] = reduceTimes @@ -148,16 +147,15 @@ trait InputScaledRuntimeReducerEstimator extends ReducerHistoryEstimator { override protected def estimate( info: FlowStrategyInfo, conf: JobConf, - history: Seq[FlowStepHistory] - ): Option[Int] = { + history: Seq[FlowStepHistory]): Option[Int] = { val reduceTimes: Seq[Seq[Double]] = getReduceTimes(history) LOG.info( s"""| |History items have the following numbers of tasks: - | ${ history.map(_.tasks.length) }, + | ${history.map(_.tasks.length)}, |and the following numbers of tasks have valid task histories: - | ${ reduceTimes.map(_.length) }""".stripMargin) + | ${reduceTimes.map(_.length)}""".stripMargin) // total time taken in the step = time per reducer * number of reducers val jobTimes: Seq[Option[Double]] = reduceTimes @@ -188,7 +186,7 @@ trait InputScaledRuntimeReducerEstimator extends ReducerHistoryEstimator { LOG.info( s""" - | - HDFS bytes read: ${ history.map(_.hdfsBytesRead) } + | - HDFS bytes read: ${history.map(_.hdfsBytesRead)} | - Time-to-byte-ratios: $timeToByteRatios | - Typical type-to-byte-ratio: $typicalTimeToByteRatio | - Desired runtime: $desiredRuntime diff --git a/scalding-core/src/main/scala/com/twitter/scalding/serialization/CascadingBinaryComparator.scala b/scalding-core/src/main/scala/com/twitter/scalding/serialization/CascadingBinaryComparator.scala index 75c9cfae41..2a543110f7 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/serialization/CascadingBinaryComparator.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/serialization/CascadingBinaryComparator.scala @@ -77,7 +77,7 @@ object CascadingBinaryComparator { def getDescriptionsForMissingOrdSer[U](bfs: BaseFlowStep[U]): Option[String] = // does this job have any Splices without OrderedSerialization: - if (bfs.getGraph.vertexSet.asScala.exists { + if (bfs.getElementGraph.vertexSet.asScala.exists { case gb: GroupBy => check(gb).isFailure case cg: CoGroup => check(cg).isFailure case _ => false // only do sorting in groupBy/cogroupBy diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/cascading_backend/CascadingBackend.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/cascading_backend/CascadingBackend.scala index a03d2afc93..0a24f2d840 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/cascading_backend/CascadingBackend.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/cascading_backend/CascadingBackend.scala @@ -2,26 +2,17 @@ package com.twitter.scalding.typed.cascading_backend import cascading.flow.FlowDef import cascading.operation.Operation -import cascading.pipe.{ CoGroup, Each, Pipe, HashJoin } -import cascading.tuple.{ Fields, Tuple => CTuple, TupleEntry } -import com.twitter.scalding.TupleConverter.{ singleConverter, tuple2Converter } -import com.twitter.scalding.TupleSetter.{ singleSetter, tup2Setter } -import com.twitter.scalding.{ - CleanupIdentityFunction, Config, Dsl, Field, FlatMapFunction, FlowStateMap, GroupBuilder, - HadoopMode, LineNumber, IterableSource, MapsideReduce, Mode, - RichPipe, TupleConverter, TupleGetter, TupleSetter, TypedBufferOp, WrappedJoiner, Write -} +import cascading.pipe.{CoGroup, Each, HashJoin, Pipe} +import cascading.tuple.{Fields, TupleEntry, Tuple => CTuple} +import com.twitter.scalding.TupleConverter.{singleConverter, tuple2Converter} +import com.twitter.scalding.TupleSetter.{singleSetter, tup2Setter} +import com.twitter.scalding.{CleanupIdentityFunction, Config, Dsl, Field, FlatMapFunction, FlowStateMap, GroupBuilder, HadoopMode, IterableSource, LineNumber, MapsideReduce, Mode, RichPipe, TupleConverter, TupleGetter, TupleSetter, TypedBufferOp, WrappedJoiner, Write} import com.twitter.scalding.typed._ -import com.twitter.scalding.serialization.{ - Boxed, - BoxedOrderedSerialization, - CascadingBinaryComparator, - EquivSerialization, - OrderedSerialization, - WrappedSerialization -} +import com.twitter.scalding.serialization.{Boxed, BoxedOrderedSerialization, CascadingBinaryComparator, EquivSerialization, OrderedSerialization, WrappedSerialization} import java.util.WeakHashMap -import scala.collection.mutable.{ Map => MMap } + +import scala.collection.immutable +import scala.collection.mutable.{Map => MMap} object CascadingBackend { import TypedPipe._ @@ -274,14 +265,14 @@ object CascadingBackend { uniquePipes match { case Nil => loop(EmptyTypedPipe, rest, ds ::: descriptions) case h :: Nil => loop(h, rest, ds ::: descriptions) - case otherwise => + case h :: tail => // push all the remaining flatmaps up: - val pipes = otherwise.map(loop(_, rest, Nil)) - // make the cascading pipe // TODO: a better optimization is to not materialize this // node at all if there is no fan out since groupBy and cogroupby // can accept multiple inputs - val merged = new cascading.pipe.Merge(pipes.map(RichPipe.assignName): _*) + val headPipe = loop(h, rest, Nil) + val tailPipes = tail.map(loop(_, rest, Nil)) + val merged = RichPipe.mergeAvoidingHashes(headPipe, tailPipes) applyDescriptions(merged, ds ::: descriptions) } case src@SourcePipe(_) => diff --git a/scalding-core/src/test/scala/com/twitter/scalding/TemplateSourceTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/TemplateSourceTest.scala deleted file mode 100644 index 7ab5ff7bc2..0000000000 --- a/scalding-core/src/test/scala/com/twitter/scalding/TemplateSourceTest.scala +++ /dev/null @@ -1,63 +0,0 @@ -/* -Copyright 2013 Inkling, Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package com.twitter.scalding - -import java.io.File -import scala.io.{ Source => ScalaSource } - -import org.scalatest.{ Matchers, WordSpec } - -class TemplateTestJob(args: Args) extends Job(args) { - try { - Tsv("input", ('col1, 'col2)).read.write(TemplatedTsv("base", "%s", 'col1)) - } catch { - case e: Exception => e.printStackTrace() - } -} - -class TemplateSourceTest extends WordSpec with Matchers { - import Dsl._ - "TemplatedTsv" should { - "split output by template" in { - val input = Seq(("A", 1), ("A", 2), ("B", 3)) - - // Need to save the job to allow, find the temporary directory data was written to - var job: Job = null; - def buildJob(args: Args): Job = { - job = new TemplateTestJob(args) - job - } - - JobTest(buildJob(_)) - .source(Tsv("input", ('col1, 'col2)), input) - .runHadoop - .finish() - - val testMode = job.mode.asInstanceOf[HadoopTest] - - val directory = new File(testMode.getWritePathFor(TemplatedTsv("base", "%s", 'col1))) - - directory.listFiles().map({ _.getName() }).toSet shouldBe Set("A", "B") - - val aSource = ScalaSource.fromFile(new File(directory, "A/part-00000")) - val bSource = ScalaSource.fromFile(new File(directory, "B/part-00000")) - - aSource.getLines.toList shouldBe Seq("A\t1", "A\t2") - bSource.getLines.toList shouldBe Seq("B\t3") - } - } -} diff --git a/scalding-core/src/test/scala/com/twitter/scalding/TypedFieldsTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/TypedFieldsTest.scala index 679437faae..ae0a584f22 100644 --- a/scalding-core/src/test/scala/com/twitter/scalding/TypedFieldsTest.scala +++ b/scalding-core/src/test/scala/com/twitter/scalding/TypedFieldsTest.scala @@ -27,7 +27,7 @@ class TypedFieldsTest extends WordSpec with Matchers { "throw an exception if a field is not comparable" in { val thrown = the[FlowException] thrownBy untypedJob() - thrown.getMessage shouldBe "local step failed" + thrown.getMessage should startWith("local step failed") } // Now run the typed fields version diff --git a/scalding-core/src/test/scala/com/twitter/scalding/TypedPipeTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/TypedPipeTest.scala index 85191ad2d6..639f0fd3c6 100644 --- a/scalding-core/src/test/scala/com/twitter/scalding/TypedPipeTest.scala +++ b/scalding-core/src/test/scala/com/twitter/scalding/TypedPipeTest.scala @@ -818,6 +818,158 @@ class TypedMergeTest extends WordSpec with Matchers { } } +class TypedHashAndMergeJob(args: Args) extends Job(args) { + val tp = TypedPipe.from(TypedText.tsv[String]("input")) + val tp2 = TypedPipe.from(TypedText.tsv[(String, Int)]("mixin")) + + val x = tp.groupBy(x => x) + .hashJoin(tp2.group) + .values + .map { case (s, i) => s"${s}-${i}" } + + (x ++ tp) + .write(TypedText.tsv[String]("output")) +} + +class TypedHashAndMergeTest extends WordSpec with Matchers { + import Dsl._ + "A TypedMergeJob" should { + var idx = 0 + JobTest(new TypedHashAndMergeJob(_)) + .source(TypedText.tsv[String]("input"), List(Tuple1("abc"), Tuple1("def"), Tuple1("ghi"))) + .source(TypedText.tsv[(String, Int)]("mixin"), List("def" -> 2, "ghi" -> 3)) + .typedSink(TypedText.tsv[String]("output")) { outBuf => + (idx + ": correctly run despite a hash-merge situation") in { /* which isn't straightforward in Cascading */ + outBuf.toSet shouldBe Set("def-2", "ghi-3", "abc", "def", "ghi") + } + idx += 1 + } + .runHadoop + .finish() + } +} + +trait TypedComplexHashAndMergeJobBase { + def ta: TypedPipe[String] + def taXb: TypedPipe[String] + def tc: TypedPipe[String] + def td: TypedPipe[String] + def tdXe: TypedPipe[String] +} + +class TypedComplexHashAndMergeJob(args: Args, + fieldsToMerge: Seq[(String, TypedComplexHashAndMergeJobBase => TypedPipe[String])]) + extends Job(args) with TypedComplexHashAndMergeJobBase { + + override def name: String = super.name + " (" + fieldsToMerge.map(_._1).mkString(" ++ ") + ")" + + val ta = TypedPipe.from(TypedText.tsv[String]("a")) + val tb = TypedPipe.from(TypedText.tsv[(String, Int)]("b")) + + val tc = TypedPipe.from(TypedText.tsv[String]("c")) + val td = TypedPipe.from(TypedText.tsv[String]("d")) + val te = TypedPipe.from(TypedText.tsv[(String, Int)]("e")) + + val taXb: TypedPipe[String] = ta.groupBy(x => x).hashJoin(tb.group).values.map { case (s, i) => s"${s}→${i}" } + val tdXe: TypedPipe[String] = td.groupBy(x => x).hashJoin(te.group).values.map { case (s, i) => s"${s}⇒${i}" } + + fieldsToMerge.map(_._2(this)).reduce(_ ++ _) + .write(TypedText.tsv[String]("output")) +} + +class TypedComplexHashAndMergeTest extends WordSpec with Matchers { + import Dsl._ + + val fields = Seq[(String, TypedComplexHashAndMergeJobBase => TypedPipe[String])]( + ("a", _.ta), + ("a∩b", _.taXb), + ("c", _.tc), + ("d", _.td), + ("d∩e", _.tdXe)) + + val selection = fields.permutations.take(3) // Take'em all if you need to prove all permutations work equally (kind of slow, and internally we do use commutativity) + + selection.foreach(perm => { + val permName = perm.map(_._1).mkString(" ++ ") + + s"A TypedComplexHashAndMergeJob ${permName}" should { + var idx = 0 + + JobTest(new TypedComplexHashAndMergeJob(_: Args, perm)) + .source(TypedText.tsv[String]("a"), List(Tuple1("a1"), Tuple1("a2"), Tuple1("a3"), Tuple1("a4"))) + .source(TypedText.tsv[(String, Int)]("b"), List("a2" -> 2, "a3" -> 3, "a6" -> 6, "d2" -> 7, "d3" -> 8)) + .source(TypedText.tsv[String]("c"), List(Tuple1("c1"), Tuple1("c2"))) + .source(TypedText.tsv[String]("d"), List(Tuple1("d1"), Tuple1("d2"), Tuple1("d3"), Tuple1("d4"))) + .source(TypedText.tsv[(String, Int)]("e"), List("d2" -> 4, "d3" -> 5, "a2" -> 9)) + .typedSink(TypedText.tsv[String]("output")) { outBuf => + (s"${idx}: correctly run despite a hash-merge situation") in { + /* which isn't straightforward in Cascading */ + + outBuf.toSet shouldBe Set("a1", "a2", "a3", "a4", "a2→2", "a3→3", + "c1", "c2", "d1", "d2", "d3", "d4", "d2⇒4", "d3⇒5") + } + idx += 1 + } + .runWithoutNext(true) // .runHadoop but we don't want the hadoop thing to run everything at once. + .finish() + } + }) +} + +class TypedTwistedHashAndMergeJob(args: Args) extends Job(args) { + /* The purpose of this job is to find a complex case where TypedPipe#mergeAvoidingHashes might fail to stop */ + + val ta = TypedPipe.from(TypedText.tsv[String]("a")) + val tb = TypedPipe.from(TypedText.tsv[(String, Int)]("b")) + + val td = TypedPipe.from(TypedText.tsv[String]("d")) + val te = TypedPipe.from(TypedText.tsv[(String, Int)]("e")) + + val taXb: TypedPipe[String] = ta.groupBy(x => x).hashJoin(tb.group).values.map { case (s, i) => s"${s}→${i}" } + val tdXe: TypedPipe[String] = td.groupBy(x => x).hashJoin(te.group).values.map { case (s, i) => s"${s}⇒${i}" } + + val taXbXe: TypedPipe[String] = taXb.groupBy(_.split("→").head).hashJoin(te.group).values.map { case (s, i) => s"${s}→${i}" } + val tdXeXb: TypedPipe[String] = tdXe.groupBy(_.split("⇒").head).hashJoin(tb.group).values.map { case (s, i) => s"${s}⇒${i}" } + + val twistA = taXbXe.map(x => { + val y = x.split("→", 1) + (y.head, y.tail) + }).group + + val twistB = tdXeXb.map(x => { + val y = x.split("⇒", 1) + (y.head, y.tail) + }).group + + val twistAB: TypedPipe[String] = twistA.hashJoin(twistB).values.map { case (a, b) => a + "≡" + b } + val twistBA: TypedPipe[String] = twistB.hashJoin(twistA).values.map { case (a, b) => a + "≢" + b } + + (taXbXe ++ tdXeXb ++ twistAB ++ twistBA) + .write(TypedText.tsv[String]("output")) +} + +class TypedTwistedHashAndMergeTest extends WordSpec with Matchers { + import Dsl._ + + s"A TypedTwistedHashAndMergeTest" should { + var idx = 0 + + JobTest(new TypedTwistedHashAndMergeJob(_: Args)) + .source(TypedText.tsv[String]("a"), List(Tuple1("a1"), Tuple1("a2"), Tuple1("a3"), Tuple1("a4"))) + .source(TypedText.tsv[(String, Int)]("b"), List("a2" -> 2, "a3" -> 3, "a6" -> 6, "d2" -> 7, "d3" -> 8)) + .source(TypedText.tsv[String]("d"), List(Tuple1("d1"), Tuple1("d2"), Tuple1("d3"), Tuple1("d4"))) + .source(TypedText.tsv[(String, Int)]("e"), List("d2" -> 4, "d3" -> 5, "a2" -> 9)) + .typedSink(TypedText.tsv[String]("output")) { outBuf => + (s"${idx}: correctly run despite a hash-merge situation") in { + outBuf.toSet shouldBe Set("d2⇒4⇒7", "d3⇒5⇒8", "a2→2→9") + } + idx += 1 + } + .runHadoop + .finish() + } +} + class TypedShardJob(args: Args) extends Job(args) { (TypedPipe.from(TypedText.tsv[String]("input")) ++ (TypedPipe.empty.map { _ => "hey" }) ++ diff --git a/scalding-core/src/test/scala/com/twitter/scalding/estimation/memory/MemoryEstimatorStepStrategyTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/estimation/memory/MemoryEstimatorStepStrategyTest.scala index 0b8ce6e6ec..0a2a34e767 100644 --- a/scalding-core/src/test/scala/com/twitter/scalding/estimation/memory/MemoryEstimatorStepStrategyTest.scala +++ b/scalding-core/src/test/scala/com/twitter/scalding/estimation/memory/MemoryEstimatorStepStrategyTest.scala @@ -1,7 +1,7 @@ package com.twitter.scalding.estimation.memory import org.apache.hadoop.mapred.JobConf -import org.scalatest.{Matchers, WordSpec} +import org.scalatest.{ Matchers, WordSpec } class MemoryEstimatorStepStrategyTest extends WordSpec with Matchers { "A Memory estimator step strategy" should { @@ -28,8 +28,9 @@ class MemoryEstimatorStepStrategyTest extends WordSpec with Matchers { def confWith(values: Map[String, String]): JobConf = { val conf = new JobConf(false) - values.foreach { case (k, v) => - conf.set(k, v) + values.foreach { + case (k, v) => + conf.set(k, v) } conf diff --git a/scalding-core/src/test/scala/com/twitter/scalding/estimation/memory/SmoothedHistoryMemoryEstimatorTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/estimation/memory/SmoothedHistoryMemoryEstimatorTest.scala index 8184e1d369..0173a9a16f 100644 --- a/scalding-core/src/test/scala/com/twitter/scalding/estimation/memory/SmoothedHistoryMemoryEstimatorTest.scala +++ b/scalding-core/src/test/scala/com/twitter/scalding/estimation/memory/SmoothedHistoryMemoryEstimatorTest.scala @@ -1,12 +1,12 @@ package com.twitter.scalding.estimation.memory import cascading.flow.FlowStep -import com.twitter.scalding.estimation.{FlowStepHistory, FlowStrategyInfo, HistoryService, Task} +import com.twitter.scalding.estimation.{ FlowStepHistory, FlowStrategyInfo, HistoryService, Task } import org.apache.hadoop.mapred.JobConf import org.mockito.Mockito._ import org.mockito.Matchers._ -import org.scalatest.{Matchers, WordSpec} -import scala.util.{Success, Try} +import org.scalatest.{ Matchers, WordSpec } +import scala.util.{ Success, Try } class SmoothedHistoryMemoryEstimatorTest extends WordSpec with Matchers { import Utils._ @@ -19,8 +19,7 @@ class SmoothedHistoryMemoryEstimatorTest extends WordSpec with Matchers { "estimate correct numbers for only reducers" in { val estimation = SmoothedMemoryEstimator .makeHistory(Seq( - "REDUCE" -> 1024.megabytes - )) + "REDUCE" -> 1024.megabytes)) .estimate(TestFlowStrategyInfo.dummy) estimation shouldBe reduceEstimate((1228, 1536)) @@ -29,8 +28,7 @@ class SmoothedHistoryMemoryEstimatorTest extends WordSpec with Matchers { "estimate correct numbers for only mappers" in { val estimation = SmoothedMemoryEstimator .makeHistory(Seq( - "MAP" -> 1024.megabytes - )) + "MAP" -> 1024.megabytes)) .estimate(TestFlowStrategyInfo.dummy) estimation shouldBe mapEstimate((1228, 1536)) @@ -46,8 +44,7 @@ class SmoothedHistoryMemoryEstimatorTest extends WordSpec with Matchers { "MAP" -> 1300.megabytes, "REDUCE" -> 1300.megabytes, "MAP" -> 723.megabytes, - "REDUCE" -> 723.megabytes - )) + "REDUCE" -> 723.megabytes)) .estimate(TestFlowStrategyInfo.dummy) estimation shouldBe Some(MemoryEstimate(Some((1228, 1536)), Some((1228, 1536)))) @@ -57,14 +54,12 @@ class SmoothedHistoryMemoryEstimatorTest extends WordSpec with Matchers { val conf = TestFlowStrategyInfo.dummy.step.getConfig val estimation = SmoothedMemoryEstimator .makeHistory(Seq( - "MAP" -> (MemoryEstimatorConfig.getMaxContainerMemory(conf).megabyte + 1.gigabyte) - )) + "MAP" -> (MemoryEstimatorConfig.getMaxContainerMemory(conf).megabyte + 1.gigabyte))) .estimate(TestFlowStrategyInfo.dummy) val expectedEstimation = ( (MemoryEstimatorConfig.getMaxContainerMemory(conf) / MemoryEstimatorConfig.getXmxScaleFactor(conf)).toLong, - MemoryEstimatorConfig.getMaxContainerMemory(conf) - ) + MemoryEstimatorConfig.getMaxContainerMemory(conf)) estimation shouldBe mapEstimate(expectedEstimation) } @@ -73,14 +68,12 @@ class SmoothedHistoryMemoryEstimatorTest extends WordSpec with Matchers { val conf = TestFlowStrategyInfo.dummy.step.getConfig val estimation = SmoothedMemoryEstimator .makeHistory(Seq( - "MAP" -> (MemoryEstimatorConfig.getMinContainerMemory(conf).megabyte - 500.megabyte) - )) + "MAP" -> (MemoryEstimatorConfig.getMinContainerMemory(conf).megabyte - 500.megabyte))) .estimate(TestFlowStrategyInfo.dummy) val expectedEstimation = ( (MemoryEstimatorConfig.getMinContainerMemory(conf) / MemoryEstimatorConfig.getXmxScaleFactor(conf)).toLong, - MemoryEstimatorConfig.getMinContainerMemory(conf) - ) + MemoryEstimatorConfig.getMinContainerMemory(conf)) estimation shouldBe mapEstimate(expectedEstimation) } @@ -94,39 +87,36 @@ object EmptyHistoryService extends HistoryService { class DummyHistoryService(val history: Seq[(String, Long)]) extends HistoryService { override def fetchHistory(info: FlowStrategyInfo, maxHistory: Int): Try[Seq[FlowStepHistory]] = { - Success(history.map { case (taskType, memory) => - val task = Task( - details = Map( - Task.TaskType -> taskType - ), - counters = Map( - SmoothedHistoryMemoryEstimator.CommittedHeapBytes -> memory - ) - ) - val tasks = Seq(task) - FlowStepHistory( - keys = null, - submitTimeMillis = 0, - launchTimeMillis = 0L, - finishTimeMillis = 0L, - totalMaps = 0L, - totalReduces = 0L, - finishedMaps = 0L, - finishedReduces = 0L, - failedMaps = 0L, - failedReduces = 0L, - mapFileBytesRead = 0L, - mapFileBytesWritten = 0L, - mapOutputBytes = 0l, - reduceFileBytesRead = 0l, - hdfsBytesRead = 0l, - hdfsBytesWritten = 0L, - mapperTimeMillis = 0L, - reducerTimeMillis = 0L, - reduceShuffleBytes = 0L, - cost = 1.1, - tasks = tasks - ) + Success(history.map { + case (taskType, memory) => + val task = Task( + details = Map( + Task.TaskType -> taskType), + counters = Map( + SmoothedHistoryMemoryEstimator.CommittedHeapBytes -> memory)) + val tasks = Seq(task) + FlowStepHistory( + keys = null, + submitTimeMillis = 0, + launchTimeMillis = 0L, + finishTimeMillis = 0L, + totalMaps = 0L, + totalReduces = 0L, + finishedMaps = 0L, + finishedReduces = 0L, + failedMaps = 0L, + failedReduces = 0L, + mapFileBytesRead = 0L, + mapFileBytesWritten = 0L, + mapOutputBytes = 0l, + reduceFileBytesRead = 0l, + hdfsBytesRead = 0l, + hdfsBytesWritten = 0L, + mapperTimeMillis = 0L, + reducerTimeMillis = 0L, + reduceShuffleBytes = 0L, + cost = 1.1, + tasks = tasks) }) } } diff --git a/scalding-estimators-test/src/test/scala/com/twitter/scalding/estimation/memory/MemoryEstimatorTest.scala b/scalding-estimators-test/src/test/scala/com/twitter/scalding/estimation/memory/MemoryEstimatorTest.scala index ee4d65b123..30438ee339 100644 --- a/scalding-estimators-test/src/test/scala/com/twitter/scalding/estimation/memory/MemoryEstimatorTest.scala +++ b/scalding-estimators-test/src/test/scala/com/twitter/scalding/estimation/memory/MemoryEstimatorTest.scala @@ -143,9 +143,9 @@ class CustomHistoryService(val history: JobConf => Seq[(String, Long)]) extends import Utils._ override def fetchHistory(info: FlowStrategyInfo, maxHistory: Int): Try[Seq[FlowStepHistory]] = { - if (info.step.getStepNum == 1) { + if (info.step.getOrdinal == 0) { makeHistory(info.step.getConfig, history) - } else if (info.step.getStepNum == 2) { + } else if (info.step.getOrdinal == 1) { Success(Nil) } else { makeHistory(info.step.getConfig, _ => Seq( diff --git a/scalding-hadoop-test/src/main/scala/com/twitter/scalding/platform/LocalCluster.scala b/scalding-hadoop-test/src/main/scala/com/twitter/scalding/platform/LocalCluster.scala index 4ec2395be7..28ea9e075f 100644 --- a/scalding-hadoop-test/src/main/scala/com/twitter/scalding/platform/LocalCluster.scala +++ b/scalding-hadoop-test/src/main/scala/com/twitter/scalding/platform/LocalCluster.scala @@ -135,7 +135,6 @@ class LocalCluster(mutex: Boolean = true) { classOf[com.twitter.chill.algebird.AveragedValueSerializer], classOf[com.twitter.algebird.Semigroup[_]], classOf[com.twitter.chill.KryoInstantiator], - classOf[org.jgrapht.ext.EdgeNameProvider[_]], classOf[org.apache.commons.lang.StringUtils], classOf[cascading.scheme.local.TextDelimited], classOf[org.apache.commons.logging.LogFactory], diff --git a/scalding-hadoop-test/src/test/scala/com/twitter/scalding/platform/PlatformTest.scala b/scalding-hadoop-test/src/test/scala/com/twitter/scalding/platform/PlatformTest.scala index 7f84da1fbf..dff7cc316a 100644 --- a/scalding-hadoop-test/src/test/scala/com/twitter/scalding/platform/PlatformTest.scala +++ b/scalding-hadoop-test/src/test/scala/com/twitter/scalding/platform/PlatformTest.scala @@ -37,26 +37,147 @@ class InAndOutJob(args: Args) extends Job(args) { } object TinyJoinAndMergeJob { - val peopleInput = TypedTsv[Int]("input1") - val peopleData = List(1, 2, 3, 4) + val joinInput1 = TypedTsv[Int]("input1") + val joinData1 = List(1, 2, 3, 4) - val messageInput = TypedTsv[Int]("input2") - val messageData = List(1, 2, 3) + val joinInput2 = TypedTsv[Int]("input2") + val joinData2 = List(1, 2, 3) + + val mergerInput = TypedTsv[Int]("input3") + val mergerData = List(1, 2, 3, 4) val output = TypedTsv[(Int, Int)]("output") + val output2 = TypedTsv[(Int, Int)]("output2") val outputData = List((1, 2), (2, 2), (3, 2), (4, 1)) } class TinyJoinAndMergeJob(args: Args) extends Job(args) { import TinyJoinAndMergeJob._ - val people = peopleInput.read.mapTo(0 -> 'id) { v: Int => v } + val input1 = joinInput1.read.mapTo(0 -> 'id) { v: Int => v } + + val joinedData = joinInput2.read + .mapTo(0 -> 'id) { v: Int => v } + .joinWithTiny('id -> 'id, input1) + + val mergerData = mergerInput.read.mapTo(0 -> 'id) { v: Int => v } + + (mergerData ++ joinedData).groupBy('id) { _.size('count) }.write(output) +} + +// Verifies fix for https://github.com/cwensel/cascading/pull/53 +class MergeTwoSinksForceToDiskJob(args: Args) extends Job(args) { + import TinyJoinAndMergeJob._ + + val input1 = joinInput1.read.mapTo(0 -> 'id) { v: Int => v } + val input2 = joinInput2.read.mapTo(0 -> 'id) { v: Int => v } + + val merged = (input1 ++ input2).groupBy('id) { _.size('count) } + + merged + .project('id, 'count) + .forceToDisk + .write(output) + + merged + .write(output2) +} + +class MergeTwoSinksForceToDiskTypedJob(args: Args) extends Job(args) { + import TinyJoinAndMergeJob._ + + val input1 = TypedPipe.from(joinInput1) + val input2 = TypedPipe.from(joinInput2) + + val merged = (input1 ++ input2).asKeys.group.size.map { case (k, v) => (k, v.toInt) } + + merged + .forceToDisk + .write(output) + + merged + .write(output2) +} + +class TinyJoinAndSelfMergeJob(args: Args) extends Job(args) { + import TinyJoinAndMergeJob._ + + val input1 = joinInput1.read.mapTo(0 -> 'id) { v: Int => v } + + val joined = joinInput2.read + .mapTo(0 -> 'id) { v: Int => v } + .joinWithTiny('id -> 'id, input1) + .flatMapTo('id -> 'id) { v: Int => Some(v) } // test Each traversal + + // merging the output of a hashjoin with one of its inputs is + // no longer supported in cascading3. + // scalding should put in a explicit checkpoint + // and this should pass + (joined ++ input1).groupBy('id) { _.size('count) }.write(output) +} + +// same as TinyJoinAndSelfMergeJob, but with ++ merge operation order swapped +class TinyJoinAndSelfMergeJob2(args: Args) extends Job(args) { + import TinyJoinAndMergeJob._ + + val input1 = joinInput1.read.mapTo(0 -> 'id) { v: Int => v } - val messages = messageInput.read + val joined = joinInput2.read .mapTo(0 -> 'id) { v: Int => v } - .joinWithTiny('id -> 'id, people) + .joinWithTiny('id -> 'id, input1) + .flatMapTo('id -> 'id) { v: Int => Some(v) } // test Each traversal + + // merging the output of a hashjoin with one of its inputs is + // no longer supported in cascading3. + // scalding should put in a explicit checkpoint + // and this should pass + (input1 ++ joined).groupBy('id) { _.size('count) }.write(output) +} + +class TinyJoinAndSelfMergeForceToDiskJob(args: Args) extends Job(args) { + import TinyJoinAndMergeJob._ + + val input1 = joinInput1.read.mapTo(0 -> 'id) { v: Int => v } + + val joined = joinInput2.read + .mapTo(0 -> 'id) { v: Int => v } + .joinWithTiny('id -> 'id, input1) + .forceToDisk + // user supplied forceToDisk in addition to the one scalding + // adds under the hood + + (joined ++ input1).groupBy('id) { _.size('count) }.write(output) +} + +// same as TinyJoinAndSelfMergeJob, but using typed api +class TinyJoinAndSelfMergeJobTyped(args: Args) extends Job(args) { + import TinyJoinAndMergeJob._ + + val input1 = TypedPipe.from(joinInput1) + + val joined = TypedPipe.from(joinInput2) + .asKeys + .hashJoin(input1.asKeys) + .keys + + (joined ++ input1).asKeys.size.map { case (k, v) => (k, v.toInt) }.write(output) +} + +// same as TinyJoinAndSelfMergeForceToDiskJob, but using typed api +class TinyJoinAndSelfMergeForceToDiskJobTyped(args: Args) extends Job(args) { + import TinyJoinAndMergeJob._ + + val input1 = TypedPipe.from(joinInput1) - (messages ++ people).groupBy('id) { _.size('count) }.write(output) + val joined = TypedPipe.from(joinInput2) + .asKeys + .hashJoin(input1.asKeys) + .keys + .forceToDisk + // user supplied forceToDisk in addition to the one scalding + // adds under the hood + + (joined ++ input1).asKeys.size.map { case (k, v) => (k, v.toInt) }.write(output) } object TsvNoCacheJob { @@ -403,6 +524,46 @@ class ReadPathJob(args: Args) extends Job(args) { .write(NullSink) } +// Based on a user job that fails in Cascading3 without fix: https://github.com/cwensel/cascading/pull/57 +// Results in a groupBy which inputs to a coGroup1. The groupBy and coGroup1 are used as inputs to +// another coGroup2. Without this fix, the Cascading planner loses one of the Each operations between +// this triangle. +object GroupByCoGroupCoGroupTriangleJob { + val output = TypedTsv[(String, Int)]("output") + + val inputData = List(("A", Seq(1, 2)), ("B", Seq(3, 4)), ("B", Seq(5, 6)), ("A", Seq(1, 2))) + val deleteList = List(1, 2) + val expectedOutput = List(("B", 3), ("B", 4), ("B", 5), ("B", 6)) +} + +class GroupByCoGroupCoGroupTriangleJob(args: Args) extends Job(args) { + import GroupByCoGroupCoGroupTriangleJob._ + + val inputTP = TypedPipe.from(inputData) + val deleteTP = TypedPipe.from(deleteList) + + val groupedValues: TypedPipe[(String, Seq[Int])] = + inputTP + .groupBy(_._1) + .mapValueStream(x => x) + .values + + val tuplesToDel = + groupedValues + .flatMap { case (str, seq) => seq.map { userId => (userId, str) } } + .join(deleteTP.asKeys) + .toTypedPipe + .map { case (userId, (name, _)) => (name, userId) } + + groupedValues + .groupBy(_._1) + .leftJoin(tuplesToDel) + .filter { case (name, (_, isPartOfDeletedSet)) => isPartOfDeletedSet.isEmpty } + .values + .flatMap { case (tuple, _) => tuple._2.map { id => (tuple._1, id) } } + .write(output) +} + object PlatformTest { def setAutoForceRight(mode: Mode, autoForce: Boolean): Unit = { mode match { @@ -450,9 +611,137 @@ class PlatformTest extends WordSpec with Matchers with HadoopSharedPlatformTest "merge and joinWithTiny shouldn't duplicate data" in { HadoopPlatformJobTest(new TinyJoinAndMergeJob(_), cluster) - .source(peopleInput, peopleData) - .source(messageInput, messageData) + .source(joinInput1, joinData1) + .source(joinInput2, joinData2) + .source(mergerInput, mergerData) + .sink(output) { _.toSet shouldBe (outputData.toSet) } + .inspectCompletedFlow { flow => + val steps = flow.getFlowSteps.asScala + steps should have size 1 + } + .run() + } + } + + "A MergeTwoSinksForceToDiskJob" should { + import TinyJoinAndMergeJob._ + + "merge and write to two sinks with forceToDisk" in { + HadoopPlatformJobTest(new MergeTwoSinksForceToDiskJob(_), cluster) + .source(joinInput1, joinData1) + .source(joinInput2, joinData2) + .sink(output) { _.toSet == outputData.toSet } + .sink(output2) { _.toSet == outputData.toSet } + .inspectCompletedFlow { flow => + val steps = flow.getFlowSteps.asScala + steps should have size 4 + } + .run() + } + } + + "A MergeTwoSinksForceToDiskTypedJob" should { + import TinyJoinAndMergeJob._ + + "merge and write to two sinks with forceToDisk" in { + HadoopPlatformJobTest(new MergeTwoSinksForceToDiskTypedJob(_), cluster) + .source(joinInput1, joinData1) + .source(joinInput2, joinData2) + .sink(output) { _.toSet == outputData.toSet } + .sink(output2) { _.toSet == outputData.toSet } + .inspectCompletedFlow { flow => + val steps = flow.getFlowSteps.asScala + steps should have size 3 + } + .run() + } + } + + "A TinyJoinAndSelfMergeJob" should { + import TinyJoinAndMergeJob._ + + "work correctly without explicit forceToDisk " in { + HadoopPlatformJobTest(new TinyJoinAndSelfMergeJob(_), cluster) + .source(joinInput1, joinData1) + .source(joinInput2, joinData2) + .sink(output) { _.toSet shouldBe (outputData.toSet) } + .inspectCompletedFlow { flow => + val steps = flow.getFlowSteps.asScala + steps should have size 2 + // two steps given we auto checkpoint before the merge + } + .run() + } + } + + "A TinyJoinAndSelfMergeJob2" should { + import TinyJoinAndMergeJob._ + + "work correctly without explicit forceToDisk " in { + HadoopPlatformJobTest(new TinyJoinAndSelfMergeJob2(_), cluster) + .source(joinInput1, joinData1) + .source(joinInput2, joinData2) + .sink(output) { _.toSet shouldBe (outputData.toSet) } + .inspectCompletedFlow { flow => + val steps = flow.getFlowSteps.asScala + steps should have size 2 + // two steps given we auto checkpoint before the merge + } + .run() + } + } + + "A TinyJoinAndSelfMergeForceToDiskJob" should { + import TinyJoinAndMergeJob._ + + "run correctly with explicit forceToDisk" in { + HadoopPlatformJobTest(new TinyJoinAndSelfMergeForceToDiskJob(_), cluster) + .source(joinInput1, joinData1) + .source(joinInput2, joinData2) + .sink(output) { _.toSet shouldBe (outputData.toSet) } + .inspectCompletedFlow { flow => + val steps = flow.getFlowSteps.asScala + steps should have size 2 + // two steps given we auto checkpoint before the merge + // user supplied forceToDisk should not add a third step + } + .run() + } + } + + "A TinyJoinAndSelfMergeJobTyped" should { + import TinyJoinAndMergeJob._ + + "work correctly without explicit forceToDisk " in { + HadoopPlatformJobTest(new TinyJoinAndSelfMergeJobTyped(_), cluster) + .source(joinInput1, joinData1) + .source(joinInput2, joinData2) .sink(output) { _.toSet shouldBe (outputData.toSet) } + .inspectCompletedFlow { flow => + val steps = flow.getFlowSteps.asScala + steps should have size 2 + // two steps given we auto checkpoint before the merge + } + .run() + } + } + + "A TinyJoinAndSelfMergeForceToDiskJobTyped" should { + import TinyJoinAndMergeJob._ + + "run correctly with explicit forceToDisk" in { + HadoopPlatformJobTest(new TinyJoinAndSelfMergeForceToDiskJobTyped(_), cluster) + .source(joinInput1, joinData1) + .source(joinInput2, joinData2) + .sink(output) { + _.toSet shouldBe (outputData.toSet) + } + .inspectCompletedFlow { flow => + val steps = flow.getFlowSteps.asScala + steps should have size 2 + // two steps given we auto checkpoint before the merge + // user supplied forceToDisk should not add a third step + } .run() } } @@ -769,4 +1058,14 @@ class PlatformTest extends WordSpec with Matchers with HadoopSharedPlatformTest assert(Option(result.getCause).exists(_.isInstanceOf[InvalidSourceException])) } } + + "A GroupByCoGroupCoGroupTriangle job" should { + import GroupByCoGroupCoGroupTriangleJob._ + + "do a groupBy along with two coGroups and not lose an Each operation" in { + HadoopPlatformJobTest(new GroupByCoGroupCoGroupTriangleJob(_), cluster) + .sink[(String, Int)]("output") { _.toList shouldBe expectedOutput } + .run() + } + } } diff --git a/scalding-hraven/src/main/scala/com/twitter/scalding/hraven/estimation/HRavenHistoryService.scala b/scalding-hraven/src/main/scala/com/twitter/scalding/hraven/estimation/HRavenHistoryService.scala index bfd74af7af..a599e6c1e2 100644 --- a/scalding-hraven/src/main/scala/com/twitter/scalding/hraven/estimation/HRavenHistoryService.scala +++ b/scalding-hraven/src/main/scala/com/twitter/scalding/hraven/estimation/HRavenHistoryService.scala @@ -101,8 +101,7 @@ trait HRavenHistoryService extends HistoryService { signature: String, stepNum: Int, max: Int, - nFetch: Int - ): Try[Seq[Flow]] = + nFetch: Int): Try[Seq[Flow]] = Try(client .fetchFlowsWithConfig(cluster, user, batch, signature, nFetch, RequiredJobConfigs: _*)) .flatMap { flows => @@ -131,10 +130,10 @@ trait HRavenHistoryService extends HistoryService { successfulFlows } }.recoverWith { - case e: IOException => - LOG.error("Error making API request to hRaven. HRavenHistoryService will be disabled.") - Failure(e) - } + case e: IOException => + LOG.error("Error making API request to hRaven. HRavenHistoryService will be disabled.") + Failure(e) + } /** * Fetch info from hRaven for the last time the given JobStep ran. @@ -146,7 +145,7 @@ trait HRavenHistoryService extends HistoryService { */ def fetchPastJobDetails(step: FlowStep[JobConf], max: Int): Try[Seq[JobDetails]] = { val conf = step.getConfig - val stepNum = step.getStepNum + val stepNum = step.getOrdinal def findMatchingJobStep(pastFlow: Flow) = pastFlow.getJobs.asScala.find { step => diff --git a/scalding-hraven/src/test/scala/com/twitter/scalding/hraven/estimation/HRavenHistoryServiceTest.scala b/scalding-hraven/src/test/scala/com/twitter/scalding/hraven/estimation/HRavenHistoryServiceTest.scala index 734aa755e7..0d8edd9a0e 100644 --- a/scalding-hraven/src/test/scala/com/twitter/scalding/hraven/estimation/HRavenHistoryServiceTest.scala +++ b/scalding-hraven/src/test/scala/com/twitter/scalding/hraven/estimation/HRavenHistoryServiceTest.scala @@ -83,7 +83,7 @@ object TestFlowStrategyInfo { val mockedInfo = mock(classOf[FlowStrategyInfo]) when(mockedStep.getConfig).thenReturn(mockedConf) - when(mockedStep.getStepNum).thenReturn(stepNum) + when(mockedStep.getOrdinal).thenReturn(stepNum) when(mockedInfo.step).thenReturn(mockedStep) mockedInfo diff --git a/scalding-jdbc/src/main/scala/com/twitter/scalding/jdbc/JDBCDriver.scala b/scalding-jdbc/src/main/scala/com/twitter/scalding/jdbc/JDBCDriver.scala index 3d9b4f70fc..243a2128aa 100644 --- a/scalding-jdbc/src/main/scala/com/twitter/scalding/jdbc/JDBCDriver.scala +++ b/scalding-jdbc/src/main/scala/com/twitter/scalding/jdbc/JDBCDriver.scala @@ -10,7 +10,7 @@ trait JdbcDriver { tableName: TableName, columnNames: Array[ColumnName], columnDefinitions: Array[Definition]) = - new TableDesc(tableName.get, columnNames.map(_.get), columnDefinitions.map(_.get), null, null) + new TableDesc(tableName.get, columnNames.map(_.get), columnDefinitions.map(_.get), null) def getJDBCScheme( columnNames: Array[ColumnName], filterCondition: Option[String], @@ -37,8 +37,7 @@ trait MysqlDriver extends JdbcDriver with MysqlTableCreationImplicits { tableName.get, columnNames.map(_.get), columnDefinitions.map(_.get), - null, - "SHOW TABLES LIKE '%s'") + null) override def getJDBCScheme( columnNames: Array[ColumnName], filterCondition: Option[String], diff --git a/scalding-json/src/main/scala/com/twitter/scalding/TypedJson.scala b/scalding-json/src/main/scala/com/twitter/scalding/TypedJson.scala index 1dde3d1648..582b74237d 100644 --- a/scalding-json/src/main/scala/com/twitter/scalding/TypedJson.scala +++ b/scalding-json/src/main/scala/com/twitter/scalding/TypedJson.scala @@ -2,7 +2,7 @@ package com.twitter.scalding import com.twitter.bijection.{ Injection, AbstractInjection } import com.twitter.bijection.Inversion._ -import com.twitter.elephantbird.cascading2.scheme.LzoTextLine +import com.twitter.elephantbird.cascading3.scheme.LzoTextLine import org.json4s._ import org.json4s.native.Serialization._ diff --git a/scalding-parquet/src/main/java/com/twitter/scalding/parquet/ParquetValueScheme.java b/scalding-parquet-cascading/src/main/java/com/twitter/scalding/parquet/cascading/ParquetValueScheme.java similarity index 92% rename from scalding-parquet/src/main/java/com/twitter/scalding/parquet/ParquetValueScheme.java rename to scalding-parquet-cascading/src/main/java/com/twitter/scalding/parquet/cascading/ParquetValueScheme.java index 2d71c44896..169c1f24e9 100644 --- a/scalding-parquet/src/main/java/com/twitter/scalding/parquet/ParquetValueScheme.java +++ b/scalding-parquet-cascading/src/main/java/com/twitter/scalding/parquet/cascading/ParquetValueScheme.java @@ -1,4 +1,4 @@ -package com.twitter.scalding.parquet; +package com.twitter.scalding.parquet.cascading; import java.io.IOException; import java.io.Serializable; @@ -120,7 +120,7 @@ private void setPredicatePushdown(JobConf jobConf) { } } @Override - public void sourceConfInit(FlowProcess jobConfFlowProcess, Tap jobConfRecordReaderOutputCollectorTap, final JobConf jobConf) { + public void sourceConfInit(FlowProcess jobConfFlowProcess, Tap jobConfRecordReaderOutputCollectorTap, final JobConf jobConf) { setPredicatePushdown(jobConf); setProjectionPushdown(jobConf); setStrictProjectionPushdown(jobConf); @@ -135,7 +135,7 @@ private void setRecordClass(JobConf jobConf) { @SuppressWarnings("unchecked") @Override - public boolean source(FlowProcess fp, SourceCall sc) + public boolean source(FlowProcess fp, SourceCall sc) throws IOException { Container value = (Container) sc.getInput().createValue(); boolean hasNext = sc.getInput().next(null, value); @@ -150,7 +150,7 @@ public boolean source(FlowProcess fp, SourceCall fp, SinkCall sc) + public void sink(FlowProcess fp, SinkCall sc) throws IOException { TupleEntry tuple = sc.getOutgoingEntry(); diff --git a/scalding-parquet/src/main/java/com/twitter/scalding/parquet/thrift/ParquetTBaseScheme.java b/scalding-parquet-cascading/src/main/java/com/twitter/scalding/parquet/cascading/thrift/ParquetTBaseScheme.java similarity index 90% rename from scalding-parquet/src/main/java/com/twitter/scalding/parquet/thrift/ParquetTBaseScheme.java rename to scalding-parquet-cascading/src/main/java/com/twitter/scalding/parquet/cascading/thrift/ParquetTBaseScheme.java index d62596b98d..732eb5d7f9 100644 --- a/scalding-parquet/src/main/java/com/twitter/scalding/parquet/thrift/ParquetTBaseScheme.java +++ b/scalding-parquet-cascading/src/main/java/com/twitter/scalding/parquet/cascading/thrift/ParquetTBaseScheme.java @@ -1,6 +1,6 @@ -package com.twitter.scalding.parquet.thrift; +package com.twitter.scalding.parquet.cascading.thrift; -import com.twitter.scalding.parquet.ParquetValueScheme; +import com.twitter.scalding.parquet.cascading.ParquetValueScheme; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; @@ -41,7 +41,7 @@ public ParquetTBaseScheme(ParquetValueScheme.Config config) { } @Override - public void sourceConfInit(FlowProcess fp, + public void sourceConfInit(FlowProcess fp, Tap tap, JobConf jobConf) { super.sourceConfInit(fp, tap, jobConf); jobConf.setInputFormat(DeprecatedParquetInputFormat.class); @@ -50,7 +50,7 @@ public void sourceConfInit(FlowProcess fp, } @Override - public void sinkConfInit(FlowProcess fp, + public void sinkConfInit(FlowProcess fp, Tap tap, JobConf jobConf) { if (this.config.getKlass() == null) { diff --git a/scalding-parquet/src/main/java/com/twitter/scalding/parquet/tuple/ParquetTupleConverter.java b/scalding-parquet-cascading/src/main/java/com/twitter/scalding/parquet/cascading/tuple/ParquetTupleConverter.java similarity index 97% rename from scalding-parquet/src/main/java/com/twitter/scalding/parquet/tuple/ParquetTupleConverter.java rename to scalding-parquet-cascading/src/main/java/com/twitter/scalding/parquet/cascading/tuple/ParquetTupleConverter.java index 4f313d7392..8fb922560c 100644 --- a/scalding-parquet/src/main/java/com/twitter/scalding/parquet/tuple/ParquetTupleConverter.java +++ b/scalding-parquet-cascading/src/main/java/com/twitter/scalding/parquet/cascading/tuple/ParquetTupleConverter.java @@ -1,4 +1,4 @@ -package com.twitter.scalding.parquet.tuple; +package com.twitter.scalding.parquet.cascading.tuple; import cascading.tuple.Tuple; diff --git a/scalding-parquet/src/main/java/com/twitter/scalding/parquet/tuple/ParquetTupleScheme.java b/scalding-parquet-cascading/src/main/java/com/twitter/scalding/parquet/cascading/tuple/ParquetTupleScheme.java similarity index 89% rename from scalding-parquet/src/main/java/com/twitter/scalding/parquet/tuple/ParquetTupleScheme.java rename to scalding-parquet-cascading/src/main/java/com/twitter/scalding/parquet/cascading/tuple/ParquetTupleScheme.java index 203f3cd67c..25f6566ce0 100644 --- a/scalding-parquet/src/main/java/com/twitter/scalding/parquet/tuple/ParquetTupleScheme.java +++ b/scalding-parquet-cascading/src/main/java/com/twitter/scalding/parquet/cascading/tuple/ParquetTupleScheme.java @@ -1,4 +1,4 @@ -package com.twitter.scalding.parquet.tuple; +package com.twitter.scalding.parquet.cascading.tuple; import java.io.IOException; import java.util.List; @@ -83,7 +83,7 @@ public ParquetTupleScheme(Fields sourceFields, Fields sinkFields, final String s @SuppressWarnings("rawtypes") @Override - public void sourceConfInit(FlowProcess fp, + public void sourceConfInit(FlowProcess fp, Tap tap, JobConf jobConf) { if (filterPredicate != null) { @@ -96,7 +96,7 @@ public void sourceConfInit(FlowProcess fp, } @Override - public Fields retrieveSourceFields(FlowProcess flowProcess, Tap tap) { + public Fields retrieveSourceFields(FlowProcess flowProcess, Tap tap) { MessageType schema = readSchema(flowProcess, tap); SchemaIntersection intersection = new SchemaIntersection(schema, getSourceFields()); @@ -105,7 +105,7 @@ public Fields retrieveSourceFields(FlowProcess flowProcess, Tap tap) { return getSourceFields(); } - private MessageType readSchema(FlowProcess flowProcess, Tap tap) { + private MessageType readSchema(FlowProcess flowProcess, Tap tap) { try { Hfs hfs; @@ -126,7 +126,7 @@ private MessageType readSchema(FlowProcess flowProcess, Tap tap) { } } - private List