diff --git a/.gitignore b/.gitignore index 8421ffc531..bfe900179a 100644 --- a/.gitignore +++ b/.gitignore @@ -31,6 +31,10 @@ tutorial/data/rightDiff.tsv tutorial/data/tmp3.tsv tutorial/data/jsonoutput0.tsv tutorial/data/avrooutput0.avro +tutorial/data/execution_output.txt .scalding_repl scalding-hadoop-test/NOTICE NOTICE +*~ +build/ +.staging \ No newline at end of file diff --git a/.travis.yml b/.travis.yml index a9a5a1ec11..8942f39fc0 100644 --- a/.travis.yml +++ b/.travis.yml @@ -46,6 +46,22 @@ matrix: env: BUILD="base" TEST_TARGET="scalding-hadoop-test" script: "scripts/run_test.sh" + - scala: 2.10.6 + env: BUILD="base" TEST_TARGET="scalding-hadoop2-mr1-test" + script: "scripts/run_test.sh" + + - scala: 2.11.7 + env: BUILD="base" TEST_TARGET="scalding-hadoop2-mr1-test" + script: "scripts/run_test.sh" + + - scala: 2.10.6 + env: BUILD="base" TEST_TARGET="scalding-hadoop2-tez-test" + script: "scripts/run_test.sh" + + - scala: 2.11.7 + env: BUILD="base" TEST_TARGET="scalding-hadoop2-tez-test" + script: "scripts/run_test.sh" + - scala: 2.10.6 env: BUILD="base" TEST_TARGET="scalding-serialization" script: "scripts/run_test.sh" diff --git a/maple/src/main/java/com/etsy/cascading/tap/local/LocalTap.java b/maple/src/main/java/com/etsy/cascading/tap/local/LocalTap.java index 1aefbbb49f..e31d87a190 100644 --- a/maple/src/main/java/com/etsy/cascading/tap/local/LocalTap.java +++ b/maple/src/main/java/com/etsy/cascading/tap/local/LocalTap.java @@ -5,6 +5,7 @@ import java.util.Properties; import java.util.logging.Logger; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.RecordReader; @@ -45,18 +46,18 @@ public class LocalTap extends Tap scheme, + public LocalTap(String path, Scheme scheme, SinkMode sinkMode) { super(new LocalScheme(scheme), sinkMode); setup(path, scheme); } - public LocalTap(String path, Scheme scheme) { + public LocalTap(String path, Scheme scheme) { super(new LocalScheme(scheme)); setup(path, scheme); } - private void setup(String path, Scheme scheme) { + private void setup(String path, Scheme scheme) { this.path = path; /* @@ -90,13 +91,13 @@ public String getIdentifier() { } @Override - public TupleEntryIterator openForRead(FlowProcess flowProcess, RecordReader input) throws IOException { + public TupleEntryIterator openForRead(FlowProcess flowProcess, RecordReader input) throws IOException { JobConf jobConf = mergeDefaults("LocalTap#openForRead", flowProcess.getConfigCopy(), defaults); return lfs.openForRead(new HadoopFlowProcess(jobConf)); } @Override - public TupleEntryCollector openForWrite(FlowProcess flowProcess, OutputCollector output) + public TupleEntryCollector openForWrite(FlowProcess flowProcess, OutputCollector output) throws IOException { JobConf jobConf = mergeDefaults("LocalTap#openForWrite", flowProcess.getConfigCopy(), defaults); return lfs.openForWrite(new HadoopFlowProcess(jobConf)); @@ -141,11 +142,11 @@ private static class LocalScheme extends Scheme { private static final long serialVersionUID = 5710119342340369543L; - private Scheme scheme; + private Scheme scheme; private JobConf defaults; private Lfs lfs; - public LocalScheme(Scheme scheme) { + public LocalScheme(Scheme scheme) { super(scheme.getSourceFields(), scheme.getSinkFields()); this.scheme = scheme; } @@ -159,19 +160,19 @@ private void setLfs(Lfs lfs) { } @Override - public Fields retrieveSourceFields(FlowProcess flowProcess, + public Fields retrieveSourceFields(FlowProcess flowProcess, Tap tap) { return scheme.retrieveSourceFields(new HadoopFlowProcess(defaults), lfs); } @Override - public void presentSourceFields(FlowProcess flowProcess, + public void presentSourceFields(FlowProcess flowProcess, Tap tap, Fields fields) { scheme.presentSourceFields(new HadoopFlowProcess(defaults), lfs, fields); } @Override - public void sourceConfInit(FlowProcess flowProcess, + public void sourceConfInit(FlowProcess flowProcess, Tap tap, Properties conf) { JobConf jobConf = mergeDefaults("LocalScheme#sourceConfInit", conf, defaults); scheme.sourceConfInit(new HadoopFlowProcess(jobConf), lfs, jobConf); @@ -179,19 +180,19 @@ public void sourceConfInit(FlowProcess flowProcess, } @Override - public Fields retrieveSinkFields(FlowProcess flowProcess, + public Fields retrieveSinkFields(FlowProcess flowProcess, Tap tap) { return scheme.retrieveSinkFields(new HadoopFlowProcess(defaults), lfs); } @Override - public void presentSinkFields(FlowProcess flowProcess, + public void presentSinkFields(FlowProcess flowProcess, Tap tap, Fields fields) { scheme.presentSinkFields(new HadoopFlowProcess(defaults), lfs, fields); } @Override - public void sinkConfInit(FlowProcess flowProcess, + public void sinkConfInit(FlowProcess flowProcess, Tap tap, Properties conf) { JobConf jobConf = mergeDefaults("LocalScheme#sinkConfInit", conf, defaults); scheme.sinkConfInit(new HadoopFlowProcess(jobConf), lfs, jobConf); @@ -199,13 +200,13 @@ public void sinkConfInit(FlowProcess flowProcess, } @Override - public boolean source(FlowProcess flowProcess, SourceCall sourceCall) + public boolean source(FlowProcess flowProcess, SourceCall sourceCall) throws IOException { throw new RuntimeException("LocalTap#source is never called"); } @Override - public void sink(FlowProcess flowProcess, SinkCall sinkCall) + public void sink(FlowProcess flowProcess, SinkCall sinkCall) throws IOException { throw new RuntimeException("LocalTap#sink is never called"); } diff --git a/maple/src/main/java/com/twitter/maple/hbase/HBaseScheme.java b/maple/src/main/java/com/twitter/maple/hbase/HBaseScheme.java index 0f830ede86..6dfa4ff7ff 100644 --- a/maple/src/main/java/com/twitter/maple/hbase/HBaseScheme.java +++ b/maple/src/main/java/com/twitter/maple/hbase/HBaseScheme.java @@ -154,7 +154,7 @@ public String[] getFamilyNames() { } @Override - public void sourcePrepare(FlowProcess flowProcess, + public void sourcePrepare(FlowProcess flowProcess, SourceCall sourceCall) { Object[] pair = new Object[]{sourceCall.getInput().createKey(), sourceCall.getInput().createValue()}; @@ -163,13 +163,13 @@ public void sourcePrepare(FlowProcess flowProcess, } @Override - public void sourceCleanup(FlowProcess flowProcess, + public void sourceCleanup(FlowProcess flowProcess, SourceCall sourceCall) { sourceCall.setContext(null); } @Override - public boolean source(FlowProcess flowProcess, + public boolean source(FlowProcess flowProcess, SourceCall sourceCall) throws IOException { Tuple result = new Tuple(); @@ -206,7 +206,7 @@ public boolean source(FlowProcess flowProcess, } @Override - public void sink(FlowProcess flowProcess, SinkCall sinkCall) + public void sink(FlowProcess flowProcess, SinkCall sinkCall) throws IOException { TupleEntry tupleEntry = sinkCall.getOutgoingEntry(); OutputCollector outputCollector = sinkCall.getOutput(); @@ -231,7 +231,7 @@ public void sink(FlowProcess flowProcess, SinkCall process, + public void sinkConfInit(FlowProcess process, Tap tap, JobConf conf) { conf.setOutputFormat(TableOutputFormat.class); @@ -240,7 +240,7 @@ public void sinkConfInit(FlowProcess process, } @Override - public void sourceConfInit(FlowProcess process, + public void sourceConfInit(FlowProcess process, Tap tap, JobConf conf) { conf.setInputFormat(TableInputFormat.class); diff --git a/maple/src/main/java/com/twitter/maple/hbase/HBaseTap.java b/maple/src/main/java/com/twitter/maple/hbase/HBaseTap.java index 37ebfb0a8e..cc5f764215 100644 --- a/maple/src/main/java/com/twitter/maple/hbase/HBaseTap.java +++ b/maple/src/main/java/com/twitter/maple/hbase/HBaseTap.java @@ -143,7 +143,7 @@ protected HBaseAdmin getHBaseAdmin(JobConf conf) throws MasterNotRunningExceptio } @Override - public void sinkConfInit(FlowProcess process, JobConf conf) { + public void sinkConfInit(FlowProcess process, JobConf conf) { if(quorumNames != null) { conf.set("hbase.zookeeper.quorum", quorumNames); } @@ -178,12 +178,12 @@ public String getIdentifier() { } @Override - public TupleEntryIterator openForRead(FlowProcess jobConfFlowProcess, RecordReader recordReader) throws IOException { + public TupleEntryIterator openForRead(FlowProcess jobConfFlowProcess, RecordReader recordReader) throws IOException { return new HadoopTupleEntrySchemeIterator(jobConfFlowProcess, this, recordReader); } @Override - public TupleEntryCollector openForWrite(FlowProcess jobConfFlowProcess, OutputCollector outputCollector) throws IOException { + public TupleEntryCollector openForWrite(FlowProcess jobConfFlowProcess, OutputCollector outputCollector) throws IOException { HBaseTapCollector hBaseCollector = new HBaseTapCollector( jobConfFlowProcess, this ); hBaseCollector.prepare(); return hBaseCollector; @@ -230,7 +230,7 @@ public long getModifiedTime(JobConf jobConf) throws IOException { } @Override - public void sourceConfInit(FlowProcess process, JobConf conf) { + public void sourceConfInit(FlowProcess process, JobConf conf) { // a hack for MultiInputFormat to see that there is a child format FileInputFormat.setInputPaths( conf, getPath() ); diff --git a/maple/src/main/java/com/twitter/maple/hbase/HBaseTapCollector.java b/maple/src/main/java/com/twitter/maple/hbase/HBaseTapCollector.java index f5ad1ed2dd..1f726c465a 100644 --- a/maple/src/main/java/com/twitter/maple/hbase/HBaseTapCollector.java +++ b/maple/src/main/java/com/twitter/maple/hbase/HBaseTapCollector.java @@ -42,7 +42,7 @@ public class HBaseTapCollector extends TupleEntrySchemeCollector hadoopFlowProcess; + private final FlowProcess hadoopFlowProcess; /** Field tap */ private final Tap tap; /** Field reporter */ @@ -58,7 +58,7 @@ public class HBaseTapCollector extends TupleEntrySchemeCollector flowProcess, Tap tap) throws IOException { + public HBaseTapCollector(FlowProcess flowProcess, Tap tap) throws IOException { super(flowProcess, tap.getScheme()); this.hadoopFlowProcess = flowProcess; this.tap = tap; diff --git a/maple/src/main/java/com/twitter/maple/tap/MemorySinkTap.java b/maple/src/main/java/com/twitter/maple/tap/MemorySinkTap.java index 6b71b08b5b..243c9b0712 100644 --- a/maple/src/main/java/com/twitter/maple/tap/MemorySinkTap.java +++ b/maple/src/main/java/com/twitter/maple/tap/MemorySinkTap.java @@ -7,6 +7,7 @@ import cascading.tuple.Tuple; import cascading.tuple.TupleEntry; import cascading.tuple.TupleEntryIterator; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.JobConf; import java.io.File; @@ -43,7 +44,7 @@ public static String getTempDir() { } @Override - public boolean commitResource(JobConf conf) throws java.io.IOException { + public boolean commitResource(Configuration conf) throws java.io.IOException { TupleEntryIterator it = new HadoopFlowProcess(conf).openTapForRead(this); boolean first_time = true; diff --git a/maple/src/main/java/com/twitter/maple/tap/MemorySourceTap.java b/maple/src/main/java/com/twitter/maple/tap/MemorySourceTap.java index 1d07de3a23..fdff677ff1 100644 --- a/maple/src/main/java/com/twitter/maple/tap/MemorySourceTap.java +++ b/maple/src/main/java/com/twitter/maple/tap/MemorySourceTap.java @@ -46,7 +46,7 @@ public List getTuples() { } @Override - public void sourceConfInit(FlowProcess flowProcess, + public void sourceConfInit(FlowProcess flowProcess, Tap, Void> tap, JobConf conf) { FileInputFormat.setInputPaths(conf, this.id); conf.setInputFormat(TupleMemoryInputFormat.class); @@ -54,13 +54,13 @@ public void sourceConfInit(FlowProcess flowProcess, } @Override - public void sinkConfInit(FlowProcess flowProcess, + public void sinkConfInit(FlowProcess flowProcess, Tap, Void> tap, JobConf conf) { throw new UnsupportedOperationException("Not supported yet."); } @Override - public void sourcePrepare( FlowProcess flowProcess, SourceCall flowProcess, SourceCall> sourceCall ) { sourceCall.setContext( new Object[ 2 ] ); @@ -69,7 +69,7 @@ public void sourcePrepare( FlowProcess flowProcess, SourceCall flowProcess, SourceCall flowProcess, SourceCall> sourceCall) throws IOException { TupleWrapper key = (TupleWrapper) sourceCall.getContext()[ 0 ]; NullWritable value = (NullWritable) sourceCall.getContext()[ 1 ]; @@ -84,13 +84,13 @@ public boolean source(FlowProcess flowProcess, SourceCall flowProcess, SourceCall flowProcess, SourceCall> sourceCall ) { sourceCall.setContext( null ); } @Override - public void sink(FlowProcess flowProcess, SinkCall sinkCall ) throws IOException { + public void sink(FlowProcess flowProcess, SinkCall sinkCall ) throws IOException { throw new UnsupportedOperationException("Not supported."); } @@ -127,7 +127,7 @@ public boolean equals(Object object) { } @Override - public TupleEntryIterator openForRead( FlowProcess flowProcess, RecordReader flowProcess, RecordReader input ) throws IOException { // input may be null when this method is called on the client side or cluster side when accumulating // for a HashJoin diff --git a/maple/src/main/java/com/twitter/maple/tap/StdoutTap.java b/maple/src/main/java/com/twitter/maple/tap/StdoutTap.java index 5c3f5f0b29..6975ec5643 100644 --- a/maple/src/main/java/com/twitter/maple/tap/StdoutTap.java +++ b/maple/src/main/java/com/twitter/maple/tap/StdoutTap.java @@ -5,6 +5,7 @@ import cascading.tap.hadoop.Lfs; import cascading.tuple.Fields; import cascading.tuple.TupleEntryIterator; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.JobConf; import java.io.File; @@ -32,7 +33,7 @@ public static String getTempDir() { } @Override - public boolean commitResource(JobConf conf) throws java.io.IOException { + public boolean commitResource(Configuration conf) throws java.io.IOException { TupleEntryIterator it = new HadoopFlowProcess(conf).openTapForRead(this); System.out.println(""); System.out.println(""); diff --git a/project/Build.scala b/project/Build.scala index faaec2e027..12c85d91ce 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -24,17 +24,17 @@ object ScaldingBuild extends Build { val algebirdVersion = "0.11.0" val avroVersion = "1.7.4" val bijectionVersion = "0.8.1" - val cascadingAvroVersion = "2.1.2" + val cascadingAvroVersion = "3.0-SNAPSHOT" // https://github.com/ScaleUnlimited/cascading.avro/pull/44 val chillVersion = "0.7.1" - val elephantbirdVersion = "4.8" + val elephantbirdVersion = "4.11-SNAPSHOT" // https://github.com/twitter/elephant-bird/pull/454 val hadoopLzoVersion = "0.4.19" - val hadoopVersion = "2.5.0" + val hadoopVersion = "2.7.1" val hbaseVersion = "0.94.10" val hravenVersion = "0.9.17.t05" val jacksonVersion = "2.4.2" val json4SVersion = "3.2.11" val paradiseVersion = "2.0.1" - val parquetVersion = "1.8.1" + val parquetVersion = "1.8.2-SNAPSHOT" // https://github.com/apache/parquet-mr/pull/284 val protobufVersion = "2.4.1" val quasiquotesVersion = "2.0.1" val scalaCheckVersion = "1.12.2" @@ -44,6 +44,9 @@ object ScaldingBuild extends Build { val slf4jVersion = "1.6.6" val thriftVersion = "0.5.0" val junitVersion = "4.10" + + /* NOTE: the temp.cchepelov.* groupIds are to let the scalding build machine access the patched upstream dependencies until they get merged. + This *must* be removed before proceeding. */ val printDependencyClasspath = taskKey[Unit]("Prints location of the dependencies") @@ -76,7 +79,9 @@ object ScaldingBuild extends Build { "Concurrent Maven Repo" at "http://conjars.org/repo", "Clojars Repository" at "http://clojars.org/repo", "Twitter Maven" at "http://maven.twttr.com", - "Cloudera" at "https://repository.cloudera.com/artifactory/cloudera-repos/" + "Cloudera" at "https://repository.cloudera.com/artifactory/cloudera-repos/", + + "Conjars Repository" at "http://conjars.org/repo" /* TEMPORARY: to get at the depencencies' snapshots while preparing the PR1446 branch */ ), printDependencyClasspath := { @@ -113,7 +118,7 @@ object ScaldingBuild extends Build { testOptions in Test += Tests.Argument(TestFrameworks.ScalaTest, "-oF"), // Uncomment if you don't want to run all the tests before building assembly - // test in assembly := {}, + test in assembly := {}, logLevel in assembly := Level.Warn, // Publishing options: @@ -216,6 +221,8 @@ object ScaldingBuild extends Build { scaldingJson, scaldingJdbc, scaldingHadoopTest, + scaldingHadoop2MR1Test, + scaldingHadoop2TezTest, scaldingDb, maple, executionTutorial, @@ -230,7 +237,7 @@ object ScaldingBuild extends Build { ).settings( test := {}, publish := {}, // skip publishing for this root project. - publishLocal := {} + publishLocal := {} ).aggregate( scaldingArgs, scaldingDate, @@ -258,7 +265,7 @@ object ScaldingBuild extends Build { * This returns the youngest jar we released that is compatible with * the current. */ - val unreleasedModules = Set[String]("hadoop-test") //releases 0.11 + val unreleasedModules = Set[String]("hadoop-test", "hadoop2-mr1-test", "hadoop2-tez") //releases 0.11 def youngestForwardCompatible(subProj: String) = Some(subProj) @@ -280,10 +287,13 @@ object ScaldingBuild extends Build { lazy val scaldingDate = module("date") lazy val cascadingVersion = - System.getenv.asScala.getOrElse("SCALDING_CASCADING_VERSION", "2.6.1") + System.getenv.asScala.getOrElse("SCALDING_CASCADING_VERSION", "3.1.0-wip-52") + + lazy val tezVersion = + System.getenv.asScala.getOrElse("SCALDING_TEZ_VERSION", "0.8.2") // should match what cascading-hadoop2-tez is using lazy val cascadingJDBCVersion = - System.getenv.asScala.getOrElse("SCALDING_CASCADING_JDBC_VERSION", "2.6.0") + System.getenv.asScala.getOrElse("SCALDING_CASCADING_JDBC_VERSION", "3.0.0-wip-127") lazy val scaldingBenchmarks = module("benchmarks").settings( libraryDependencies ++= Seq( @@ -297,7 +307,7 @@ object ScaldingBuild extends Build { lazy val scaldingCore = module("core").settings( libraryDependencies <++= (scalaVersion) { scalaVersion => Seq( "cascading" % "cascading-core" % cascadingVersion, - "cascading" % "cascading-hadoop" % cascadingVersion, + "cascading" % "cascading-hadoop" % cascadingVersion % "provided", "cascading" % "cascading-local" % cascadingVersion, "com.twitter" % "chill-hadoop" % chillVersion, "com.twitter" % "chill-java" % chillVersion, @@ -319,14 +329,17 @@ object ScaldingBuild extends Build { lazy val scaldingCommons = module("commons").settings( libraryDependencies ++= Seq( + "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided", // TODO: split into scalding-protobuf "com.google.protobuf" % "protobuf-java" % protobufVersion, "com.twitter" %% "bijection-core" % bijectionVersion, "com.twitter" %% "algebird-core" % algebirdVersion, "com.twitter" %% "chill" % chillVersion, - "com.twitter.elephantbird" % "elephant-bird-cascading2" % elephantbirdVersion, - "com.twitter.elephantbird" % "elephant-bird-core" % elephantbirdVersion, + "temp.cchepelov.com.twitter.elephantbird" % "elephant-bird-cascading3" % elephantbirdVersion, + "temp.cchepelov.com.twitter.elephantbird" % "elephant-bird-core" % elephantbirdVersion, "com.hadoop.gplcompression" % "hadoop-lzo" % hadoopLzoVersion, + "cascading" % "cascading-hadoop" % cascadingVersion % "provided", + "cascading" % "cascading-hadoop" % cascadingVersion % "test", // TODO: split this out into scalding-thrift "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided", "org.apache.thrift" % "libthrift" % thriftVersion, @@ -340,7 +353,7 @@ object ScaldingBuild extends Build { lazy val scaldingAvro = module("avro").settings( libraryDependencies ++= Seq( - "cascading.avro" % "avro-scheme" % cascadingAvroVersion, + "temp.cchepelov.cascading.avro" % "avro-scheme" % cascadingAvroVersion, "org.apache.avro" % "avro" % avroVersion, "org.slf4j" % "slf4j-api" % slf4jVersion, "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided" @@ -350,13 +363,18 @@ object ScaldingBuild extends Build { lazy val scaldingParquet = module("parquet").settings( libraryDependencies <++= (scalaVersion) { scalaVersion => Seq( // see https://issues.apache.org/jira/browse/PARQUET-143 for exclusions - "org.apache.parquet" % "parquet-cascading" % parquetVersion + "temp.cchepelov.org.apache.parquet" % "parquet-cascading3" % parquetVersion // FIXME: https://github.com/apache/parquet-mr/pull/284 + exclude("temp.cchepelov.com.twitter.elephantbird", "elephant-bird-pig") + exclude("temp.cchepelov.com.twitter.elephantbird", "elephant-bird-core") + exclude("temp.cchepelov.org.apache.parquet", "parquet-pig") exclude("org.apache.parquet", "parquet-pig") exclude("com.twitter.elephantbird", "elephant-bird-pig") exclude("com.twitter.elephantbird", "elephant-bird-core"), "org.apache.thrift" % "libthrift" % "0.7.0", "org.slf4j" % "slf4j-api" % slf4jVersion, "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided", + "cascading" % "cascading-hadoop" % cascadingVersion % "provided", + "cascading" % "cascading-hadoop" % cascadingVersion % "test", "org.scala-lang" % "scala-reflect" % scalaVersion, "com.twitter" %% "bijection-macros" % bijectionVersion, "com.twitter" %% "chill-bijection" % chillVersion @@ -395,20 +413,33 @@ object ScaldingBuild extends Build { .settings( libraryDependencies ++= Seq( // see https://issues.apache.org/jira/browse/PARQUET-143 for exclusions - "org.apache.parquet" % "parquet-cascading" % parquetVersion + "temp.cchepelov.org.apache.parquet" % "parquet-cascading3" % parquetVersion + exclude("temp.cchepelov.com.twitter.elephantbird", "elephant-bird-pig") + exclude("temp.cchepelov.com.twitter.elephantbird", "elephant-bird-core") + exclude("temp.cchepelov.org.apache.parquet", "parquet-pig") exclude("org.apache.parquet", "parquet-pig") exclude("com.twitter.elephantbird", "elephant-bird-pig") exclude("com.twitter.elephantbird", "elephant-bird-core"), "org.slf4j" % "slf4j-api" % slf4jVersion, - "org.apache.parquet" % "parquet-thrift" % parquetVersion % "test" classifier "tests", + "org.apache.parquet" % "parquet-thrift" % "1.8.1" /* FIXME: parquetVersion */ % "test" classifier "tests" + exclude("temp.cchepelov.com.twitter.elephantbird", "elephant-bird-pig") + exclude("temp.cchepelov.com.twitter.elephantbird", "elephant-bird-core") + exclude("org.apache.parquet", "parquet-pig") + exclude("com.twitter.elephantbird", "elephant-bird-pig") + exclude("com.twitter.elephantbird", "elephant-bird-core"), "com.twitter" %% "scrooge-serializer" % scroogeVersion, + "temp.cchepelov.com.twitter.elephantbird" % "elephant-bird-core" % elephantbirdVersion, + "temp.cchepelov.com.twitter.elephantbird" % "elephant-bird-pig" % elephantbirdVersion, "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided", + "cascading" % "cascading-hadoop" % hadoopVersion % "provided", + "cascading" % "cascading-hadoop" % hadoopVersion % "test", "com.novocode" % "junit-interface" % "0.11" % "test", "junit" % "junit" % junitVersion % "test" ) ).dependsOn(scaldingCore, scaldingParquet % "compile->compile;test->test", scaldingParquetScroogeFixtures % "test->test") + lazy val scaldingHRaven = module("hraven").settings( libraryDependencies ++= Seq( "com.twitter.hraven" % "hraven-core" % hravenVersion @@ -452,6 +483,8 @@ object ScaldingBuild extends Build { "org.scala-lang" % "scala-reflect" % scalaVersion, "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided", "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "unprovided", + "cascading" % "cascading-hadoop" % cascadingVersion /* % "provided" */, /* need to have a fabric around, can't just leave it to "provided" */ + "cascading" % "cascading-hadoop" % cascadingVersion % "unprovided", /* for 'sbt run' */ "org.slf4j" % "slf4j-api" % slf4jVersion, "org.slf4j" % "slf4j-log4j12" % slf4jVersion % "provided", "org.slf4j" % "slf4j-log4j12" % slf4jVersion % "unprovided" @@ -481,9 +514,10 @@ object ScaldingBuild extends Build { lazy val scaldingJson = module("json").settings( libraryDependencies <++= (scalaVersion) { scalaVersion => Seq( "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided", + "cascading" % "cascading-hadoop" % cascadingVersion % "provided", "com.fasterxml.jackson.module" %% "jackson-module-scala" % jacksonVersion, "org.json4s" %% "json4s-native" % json4SVersion, - "com.twitter.elephantbird" % "elephant-bird-cascading2" % elephantbirdVersion % "provided" + "temp.cchepelov.com.twitter.elephantbird" % "elephant-bird-cascading3" % elephantbirdVersion % "provided" ) } ).dependsOn(scaldingCore) @@ -506,6 +540,8 @@ object ScaldingBuild extends Build { "org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion classifier "tests", "org.apache.hadoop" % "hadoop-common" % hadoopVersion classifier "tests", "org.apache.hadoop" % "hadoop-mapreduce-client-jobclient" % hadoopVersion classifier "tests", + "cascading" % "cascading-hadoop" % cascadingVersion % "provided", + "cascading" % "cascading-hadoop" % cascadingVersion % "test", "com.twitter" %% "chill-algebird" % chillVersion, "org.slf4j" % "slf4j-api" % slf4jVersion, "org.slf4j" % "slf4j-log4j12" % slf4jVersion, @@ -515,6 +551,23 @@ object ScaldingBuild extends Build { } ).dependsOn(scaldingCore, scaldingSerialization) + lazy val scaldingHadoop2MR1Test = module("hadoop2-mr1-test").settings( + libraryDependencies <++= (scalaVersion) { scalaVersion => Seq( + "cascading" % "cascading-hadoop2-mr1" % cascadingVersion % "test" + ) } + ).dependsOn(scaldingCore, scaldingSerialization, scaldingHadoopTest) + + lazy val scaldingHadoop2TezTest = module("hadoop2-tez-test").settings( + libraryDependencies <++= (scalaVersion) { scalaVersion => Seq( + "org.apache.tez" % "tez-api" % tezVersion % "test", + "org.apache.tez" % "tez-dag" % tezVersion % "test", + "org.apache.tez" % "tez-mapreduce" % tezVersion % "test", + "org.apache.tez" % "tez-mapreduce" % tezVersion % "test" classifier "tests", + "cascading" % "cascading-hadoop2-tez" % cascadingVersion % "test" + ) } + ).dependsOn(scaldingCore, scaldingSerialization, scaldingHadoopTest) + + // This one uses a different naming convention lazy val maple = Project( id = "maple", @@ -532,7 +585,8 @@ object ScaldingBuild extends Build { libraryDependencies <++= (scalaVersion) { scalaVersion => Seq( "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided", "org.apache.hbase" % "hbase" % hbaseVersion % "provided", - "cascading" % "cascading-hadoop" % cascadingVersion + "cascading" % "cascading-core" % cascadingVersion, + "cascading" % "cascading-hadoop" % cascadingVersion % "provided" ) } ) @@ -592,7 +646,9 @@ lazy val scaldingThriftMacros = module("thrift-macros") "org.apache.hadoop" % "hadoop-yarn-server" % hadoopVersion % "test", "org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion classifier "tests", "org.apache.hadoop" % "hadoop-common" % hadoopVersion classifier "tests", - "org.apache.hadoop" % "hadoop-mapreduce-client-jobclient" % hadoopVersion classifier "tests" + "org.apache.hadoop" % "hadoop-mapreduce-client-jobclient" % hadoopVersion classifier "tests", + "cascading" % "cascading-hadoop" % cascadingVersion % "test", + "cascading" % "cascading-hadoop" % cascadingVersion classifier "tests" ) ++ (if (isScala210x(scalaVersion)) Seq("org.scalamacros" %% "quasiquotes" % "2.0.1") else Seq()) }, addCompilerPlugin("org.scalamacros" % "paradise" % "2.0.1" cross CrossVersion.full) diff --git a/scalding-avro/src/main/scala/com/twitter/scalding/avro/AvroSource.scala b/scalding-avro/src/main/scala/com/twitter/scalding/avro/AvroSource.scala index 366f11869f..9355d11afa 100644 --- a/scalding-avro/src/main/scala/com/twitter/scalding/avro/AvroSource.scala +++ b/scalding-avro/src/main/scala/com/twitter/scalding/avro/AvroSource.scala @@ -25,6 +25,7 @@ import java.io.InputStream import java.io.OutputStream import java.util.Properties import cascading.tuple.Fields +import org.apache.hadoop.conf.Configuration import collection.JavaConverters._ import org.apache.hadoop.mapred.{ OutputCollector, RecordReader, JobConf } @@ -32,7 +33,7 @@ trait UnpackedAvroFileScheme extends FileSource { def schema: Option[Schema] // HadoopSchemeInstance gives compile errors in 2.10 for some reason - override def hdfsScheme = (new AvroScheme(schema.getOrElse(null))).asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] + override def hdfsScheme = (new AvroScheme(schema.getOrElse(null))).asInstanceOf[Scheme[Configuration, RecordReader[_, _], OutputCollector[_, _], _, _]] override def localScheme = (new LAvroScheme(schema.getOrElse(null))).asInstanceOf[Scheme[Properties, InputStream, OutputStream, _, _]] @@ -42,7 +43,7 @@ trait PackedAvroFileScheme[T] extends FileSource { def schema: Schema // HadoopSchemeInstance gives compile errors for this in 2.10 for some reason - override def hdfsScheme = (new PackedAvroScheme[T](schema)).asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] + override def hdfsScheme = (new PackedAvroScheme[T](schema)).asInstanceOf[Scheme[Configuration, RecordReader[_, _], OutputCollector[_, _], _, _]] override def localScheme = (new LPackedAvroScheme[T](schema)).asInstanceOf[Scheme[Properties, InputStream, OutputStream, _, _]] } diff --git a/scalding-commons/src/main/java/com/twitter/scalding/commons/scheme/KeyValueByteScheme.java b/scalding-commons/src/main/java/com/twitter/scalding/commons/scheme/KeyValueByteScheme.java index bf95160efb..4a7b8d3974 100644 --- a/scalding-commons/src/main/java/com/twitter/scalding/commons/scheme/KeyValueByteScheme.java +++ b/scalding-commons/src/main/java/com/twitter/scalding/commons/scheme/KeyValueByteScheme.java @@ -3,6 +3,7 @@ import java.io.IOException; import java.util.Arrays; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; @@ -29,7 +30,7 @@ public static byte[] getBytes(BytesWritable key) { } @Override - public boolean source(FlowProcess flowProcess, + public boolean source(FlowProcess flowProcess, SourceCall sourceCall) throws IOException { BytesWritable key = (BytesWritable) sourceCall.getContext()[0]; BytesWritable value = (BytesWritable) sourceCall.getContext()[1]; @@ -47,7 +48,7 @@ public boolean source(FlowProcess flowProcess, } @Override - public void sink(FlowProcess flowProcess, SinkCall sinkCall) + public void sink(FlowProcess flowProcess, SinkCall sinkCall) throws IOException { TupleEntry tupleEntry = sinkCall.getOutgoingEntry(); @@ -57,4 +58,3 @@ public void sink(FlowProcess flowProcess, SinkCall scheme, TapMode mode) + public VersionedTap(String dir, Scheme scheme, TapMode mode) throws IOException { super(scheme, dir); this.mode = mode; @@ -59,11 +60,11 @@ public String getOutputDirectory() { return getPath().toString(); } - public VersionedStore getStore(JobConf conf) throws IOException { + public VersionedStore getStore(Configuration conf) throws IOException { return new VersionedStore(FileSystem.get(conf), getOutputDirectory()); } - public String getSourcePath(JobConf conf) { + public String getSourcePath(Configuration conf) { VersionedStore store; try { store = getStore(conf); @@ -77,7 +78,7 @@ public String getSourcePath(JobConf conf) { } } - public String getSinkPath(JobConf conf) { + public String getSinkPath(Configuration conf) { try { VersionedStore store = getStore(conf); String sinkPath = (version == null) ? store.createVersion() : store.createVersion(version); @@ -91,33 +92,37 @@ public String getSinkPath(JobConf conf) { } @Override - public void sourceConfInit(FlowProcess process, JobConf conf) { - super.sourceConfInit(process, conf); - FileInputFormat.setInputPaths(conf, getSourcePath(conf)); + public void sourceConfInit(FlowProcess process, Configuration conf) { + JobConf jobConf = new JobConf(conf); + + super.sourceConfInit(process, jobConf); + FileInputFormat.setInputPaths(jobConf, getSourcePath(jobConf)); } @Override - public void sinkConfInit(FlowProcess process, JobConf conf) { - super.sinkConfInit(process, conf); + public void sinkConfInit(FlowProcess process, Configuration conf) { + JobConf jobConf = new JobConf(conf); + + super.sinkConfInit(process, jobConf); if (newVersionPath == null) - newVersionPath = getSinkPath(conf); + newVersionPath = getSinkPath(jobConf); - FileOutputFormat.setOutputPath(conf, new Path(newVersionPath)); + FileOutputFormat.setOutputPath(jobConf, new Path(newVersionPath)); } @Override - public boolean resourceExists(JobConf jc) throws IOException { + public boolean resourceExists(Configuration jc) throws IOException { return getStore(jc).mostRecentVersion() != null; } @Override - public boolean createResource(JobConf jc) throws IOException { + public boolean createResource(Configuration jc) throws IOException { throw new UnsupportedOperationException("Not supported yet."); } @Override - public boolean deleteResource(JobConf jc) throws IOException { + public boolean deleteResource(Configuration jc) throws IOException { throw new UnsupportedOperationException("Not supported yet."); } @@ -131,18 +136,20 @@ public String getIdentifier() { } @Override - public long getModifiedTime(JobConf conf) throws IOException { + public long getModifiedTime(Configuration conf) throws IOException { VersionedStore store = getStore(conf); return (mode == TapMode.SINK) ? 0 : store.mostRecentVersion(); } @Override - public boolean commitResource(JobConf conf) throws IOException { - VersionedStore store = new VersionedStore(FileSystem.get(conf), getOutputDirectory()); + public boolean commitResource(Configuration conf) throws IOException { + JobConf jobConf = new JobConf(conf); + + VersionedStore store = new VersionedStore(FileSystem.get(jobConf), getOutputDirectory()); if (newVersionPath != null) { store.succeedVersion(newVersionPath); - markSuccessfulOutputDir(new Path(newVersionPath), conf); + markSuccessfulOutputDir(new Path(newVersionPath), jobConf); newVersionPath = null; store.cleanup(getVersionsToKeep()); } @@ -150,7 +157,7 @@ public boolean commitResource(JobConf conf) throws IOException { return true; } - private static void markSuccessfulOutputDir(Path path, JobConf conf) throws IOException { + private static void markSuccessfulOutputDir(Path path, Configuration conf) throws IOException { FileSystem fs = FileSystem.get(conf); // create a file in the folder to mark it if (fs.exists(path)) { @@ -160,7 +167,7 @@ private static void markSuccessfulOutputDir(Path path, JobConf conf) throws IOEx } @Override - public boolean rollbackResource(JobConf conf) throws IOException { + public boolean rollbackResource(Configuration conf) throws IOException { if (newVersionPath != null) { getStore(conf).failVersion(newVersionPath); newVersionPath = null; diff --git a/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/DailySources.scala b/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/DailySources.scala index 21fa7b7e35..c89e3158ad 100644 --- a/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/DailySources.scala +++ b/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/DailySources.scala @@ -26,10 +26,12 @@ import cascading.tuple.Fields import java.io.Serializable import org.apache.thrift.TBase +import scala.annotation.meta.param + // Retrieve implicits import Dsl._ -abstract class DailySuffixLzoCodec[T](prefix: String, dateRange: DateRange)(implicit @transient suppliedInjection: Injection[T, Array[Byte]]) +abstract class DailySuffixLzoCodec[T](prefix: String, dateRange: DateRange)(implicit @(transient @param) suppliedInjection: Injection[T, Array[Byte]]) extends DailySuffixSource(prefix, dateRange) with LzoCodec[T] { val boxed = Externalizer(suppliedInjection) override lazy val injection = boxed.get diff --git a/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/HourlySources.scala b/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/HourlySources.scala index 568dce0609..1ecd671772 100644 --- a/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/HourlySources.scala +++ b/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/HourlySources.scala @@ -24,7 +24,9 @@ import com.twitter.scalding._ import com.twitter.scalding.source._ import org.apache.thrift.TBase -abstract class HourlySuffixLzoCodec[T](prefix: String, dateRange: DateRange)(implicit @transient suppliedInjection: Injection[T, Array[Byte]]) +import scala.annotation.meta.param + +abstract class HourlySuffixLzoCodec[T](prefix: String, dateRange: DateRange)(implicit @(transient @param) suppliedInjection: Injection[T, Array[Byte]]) extends HourlySuffixSource(prefix, dateRange) with LzoCodec[T] { val boxed = Externalizer(suppliedInjection) override lazy val injection = boxed.get diff --git a/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/LzoGenericScheme.scala b/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/LzoGenericScheme.scala index 25a6c4e0cf..449ef955c5 100644 --- a/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/LzoGenericScheme.scala +++ b/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/LzoGenericScheme.scala @@ -16,11 +16,12 @@ limitations under the License. package com.twitter.scalding.commons.source +import scala.annotation.meta.param import scala.reflect.ClassTag import com.twitter.bijection._ import com.twitter.chill.Externalizer -import com.twitter.elephantbird.cascading2.scheme.LzoBinaryScheme +import com.twitter.elephantbird.cascading3.scheme.LzoBinaryScheme import com.twitter.elephantbird.mapreduce.input.combine.DelegateCombineFileInputFormat import com.twitter.elephantbird.mapreduce.io.{ BinaryConverter, GenericWritable } import com.twitter.elephantbird.mapreduce.input.{ BinaryConverterProvider, MultiInputFormat } @@ -97,7 +98,7 @@ object LzoGenericScheme { /** * From a Binary Converter passed in configure in the JobConf using of that by ElephantBird */ - def setConverter[M](conv: BinaryConverter[M], conf: JobConf, confKey: String, overrideConf: Boolean = false): Unit = { + def setConverter[M](conv: BinaryConverter[M], conf: Configuration, confKey: String, overrideConf: Boolean = false): Unit = { if ((conf.get(confKey) == null) || overrideConf) { val extern = Externalizer(conv) try { @@ -115,26 +116,28 @@ object LzoGenericScheme { * Generic scheme for data stored as lzo-compressed protobuf messages. * Serialization is performed using the supplied BinaryConverter. */ -class LzoGenericScheme[M](@transient conv: BinaryConverter[M], clazz: Class[M]) extends LzoBinaryScheme[M, GenericWritable[M]] { +class LzoGenericScheme[M](@(transient @param) conv: BinaryConverter[M], clazz: Class[M]) extends LzoBinaryScheme[M, GenericWritable[M]] { + + val convBox = Externalizer(conv) override protected def prepareBinaryWritable(): GenericWritable[M] = - new GenericWritable(conv) + new GenericWritable(convBox.get) - override def sourceConfInit(fp: FlowProcess[JobConf], - tap: Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]], - conf: JobConf): Unit = { + override def sourceConfInit(fp: FlowProcess[_ <: Configuration], + tap: Tap[Configuration, RecordReader[_, _], OutputCollector[_, _]], + conf: Configuration): Unit = { - LzoGenericScheme.setConverter(conv, conf, SourceConfigBinaryConverterProvider.ProviderConfKey) + LzoGenericScheme.setConverter(convBox.get, conf, SourceConfigBinaryConverterProvider.ProviderConfKey) MultiInputFormat.setClassConf(clazz, conf) MultiInputFormat.setGenericConverterClassConf(classOf[SourceConfigBinaryConverterProvider[_]], conf) DelegateCombineFileInputFormat.setDelegateInputFormat(conf, classOf[MultiInputFormat[_]]) } - override def sinkConfInit(fp: FlowProcess[JobConf], - tap: Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]], - conf: JobConf): Unit = { - LzoGenericScheme.setConverter(conv, conf, SinkConfigBinaryConverterProvider.ProviderConfKey) + override def sinkConfInit(fp: FlowProcess[_ <: Configuration], + tap: Tap[Configuration, RecordReader[_, _], OutputCollector[_, _]], + conf: Configuration): Unit = { + LzoGenericScheme.setConverter(convBox.get, conf, SinkConfigBinaryConverterProvider.ProviderConfKey) LzoGenericBlockOutputFormat.setClassConf(clazz, conf) LzoGenericBlockOutputFormat.setGenericConverterClassConf(classOf[SinkConfigBinaryConverterProvider[_]], conf) DeprecatedOutputFormatWrapper.setOutputFormat(classOf[LzoGenericBlockOutputFormat[_]], conf) diff --git a/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/LzoTraits.scala b/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/LzoTraits.scala index eeb28fc929..def9bc1673 100644 --- a/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/LzoTraits.scala +++ b/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/LzoTraits.scala @@ -22,7 +22,7 @@ import cascading.scheme.Scheme import org.apache.thrift.TBase import com.google.protobuf.Message import com.twitter.bijection.Injection -import com.twitter.elephantbird.cascading2.scheme._ +import com.twitter.elephantbird.cascading3.scheme._ import com.twitter.scalding._ import com.twitter.scalding.Dsl._ import com.twitter.scalding.source.{ CheckedInversion, MaxFailuresCheck } diff --git a/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/LzoTypedText.scala b/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/LzoTypedText.scala index 089968b9a4..83bc91907a 100644 --- a/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/LzoTypedText.scala +++ b/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/LzoTypedText.scala @@ -3,7 +3,7 @@ package com.twitter.scalding.commons.source import cascading.scheme.Scheme import cascading.scheme.hadoop.{ TextDelimited => CHTextDelimited } import cascading.scheme.local.{ TextDelimited => CLTextDelimited } -import com.twitter.elephantbird.cascading2.scheme.LzoTextDelimited +import com.twitter.elephantbird.cascading3.scheme.LzoTextDelimited import com.twitter.scalding._ import com.twitter.scalding.source.TypedTextDelimited import com.twitter.scalding.source.TypedSep diff --git a/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/VersionedKeyValSource.scala b/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/VersionedKeyValSource.scala index 177bb3c416..7d3873dfc9 100644 --- a/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/VersionedKeyValSource.scala +++ b/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/VersionedKeyValSource.scala @@ -33,6 +33,7 @@ import com.twitter.scalding.source.{ CheckedInversion, MaxFailuresCheck } import com.twitter.scalding.typed.KeyedListLike import com.twitter.scalding.typed.TypedSink import org.apache.hadoop.mapred.JobConf +import scala.annotation.meta.param import scala.collection.JavaConverters._ /** @@ -54,7 +55,7 @@ object VersionedKeyValSource { class VersionedKeyValSource[K, V](val path: String, val sourceVersion: Option[Long], val sinkVersion: Option[Long], val maxFailures: Int, val versionsToKeep: Int)( - implicit @transient codec: Injection[(K, V), (Array[Byte], Array[Byte])]) extends Source + implicit @(transient @param) codec: Injection[(K, V), (Array[Byte], Array[Byte])]) extends Source with Mappable[(K, V)] with TypedSink[(K, V)] { diff --git a/scalding-commons/src/test/scala/com/twitter/scalding/commons/VersionedKeyValSourceTest.scala b/scalding-commons/src/test/scala/com/twitter/scalding/commons/VersionedKeyValSourceTest.scala index 74b0dd4c67..8d01cc78c7 100644 --- a/scalding-commons/src/test/scala/com/twitter/scalding/commons/VersionedKeyValSourceTest.scala +++ b/scalding-commons/src/test/scala/com/twitter/scalding/commons/VersionedKeyValSourceTest.scala @@ -15,6 +15,7 @@ limitations under the License. */ package com.twitter.scalding.commons.source +import org.apache.hadoop.fs.Path import org.scalatest.{ Matchers, WordSpec } import com.twitter.scalding._ import com.twitter.scalding.commons.datastores.VersionedStore; @@ -135,7 +136,12 @@ class VersionedKeyValSourceTest extends WordSpec with Matchers { val store = new VersionedStore(root.getAbsolutePath) versions foreach { v => val p = store.createVersion(v) + /* since dfs-datastores 1.3.5, store.succeedVersion() will fail if the directory + doesn't exist, and it won't exist after createVersion() until data is actually inserted. + So we cheat here to keep the test mechanism alive + */ new File(p).mkdirs() + /* /cheat */ store.succeedVersion(p) } diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Config.scala b/scalding-core/src/main/scala/com/twitter/scalding/Config.scala index 9fa95b0e2c..09ae379969 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Config.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Config.scala @@ -18,11 +18,12 @@ package com.twitter.scalding import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.io.serializer.{ Serialization => HSerialization } +import org.slf4j.{ Logger, LoggerFactory } import com.twitter.chill.{ ExternalizerCodec, ExternalizerInjection, Externalizer, KryoInstantiator } import com.twitter.chill.config.{ ScalaMapConfig, ConfiguredInstantiator } import com.twitter.bijection.{ Base64String, Injection } -import cascading.pipe.assembly.AggregateBy +import cascading.pipe.assembly.{ AggregateByProps, AggregateBy } import cascading.flow.{ FlowListener, FlowStepListener, FlowProps, FlowStepStrategy } import cascading.property.AppProps import cascading.tuple.collect.SpillableProps @@ -36,6 +37,7 @@ import scala.util.{ Failure, Success, Try } * This is a wrapper class on top of Map[String, String] */ trait Config extends Serializable { + @transient private lazy val logger: Logger = LoggerFactory.getLogger(this.getClass) import Config._ // get the constants def toMap: Map[String, String] @@ -101,15 +103,21 @@ trait Config extends Serializable { def setMapSpillThreshold(count: Int): Config = this + (SpillableProps.MAP_THRESHOLD -> count.toString) + @deprecated("deprecated in Cascading 2.7 and dropped in Cascading 3.0, use setMapSideAggregationCapacity", "cascading 2.7") + def setMapSideAggregationThreshold(count: Int): Config = { + logger.warn("Ignoring deprecated setMapSideAggregationThreshold") + this + } + /* * Used in map-side aggregation of associative operations (Semigroup/Monoid) * This controls how many keys are in an in-memory cache. If a significant * probability mass of the key-space is far bigger than this value, it * does not help much (and may hurt, so experiment with disabling to get - * the best results + * the best results) */ - def setMapSideAggregationThreshold(count: Int): Config = - this + (AggregateBy.AGGREGATE_BY_THRESHOLD -> count.toString) + def setMapSideAggregationCapacity(capacity: Int): Config = + this + (AggregateByProps.AGGREGATE_BY_CAPACITY -> capacity.toString) /** * Set this configuration option to require all grouping/cogrouping @@ -352,8 +360,11 @@ trait Config extends Serializable { .toList /** Get the number of reducers (this is the parameter Hadoop will use) */ - def getNumReducers: Option[Int] = get(Config.HadoopNumReducers).map(_.toInt) - def setNumReducers(n: Int): Config = this + (Config.HadoopNumReducers -> n.toString) + def getNumReducers: Option[Int] = get(Config.HadoopNumReducersLegacy) + .orElse(get(Config.HadoopNumReducers2)) + .map(_.toInt) + + def setNumReducers(n: Int): Config = this + (Config.HadoopNumReducersLegacy -> n.toString) // Note: setting the legacy key for cascading-hadoop compat, hadoop-2.6.0 still accepts it. /** Set username from System.used for querying hRaven. */ def setHRavenHistoryUserName: Config = @@ -387,7 +398,13 @@ object Config { * Parameter that actually controls the number of reduce tasks. * Be sure to set this in the JobConf for the *step* not the flow. */ - val HadoopNumReducers = "mapred.reduce.tasks" + val HadoopNumReducersLegacy = "mapred.reduce.tasks" + val HadoopNumReducers2 = "mapreduce.job.reduces" + + @deprecated( + message = "please select between HadoopNumReducersLegacy or HadoopNumReducers2. Or use getNumReducers()", + since = "2016-01-16") + val HadoopNumReducers = HadoopNumReducersLegacy // kept for source-level compatibility, for now (RFC)d /** Name of parameter to specify which class to use as the default estimator. */ val ReducerEstimators = "scalding.reducer.estimator.classes" @@ -412,7 +429,7 @@ object Config { empty .setListSpillThreshold(100 * 1000) .setMapSpillThreshold(100 * 1000) - .setMapSideAggregationThreshold(100 * 1000) + .setMapSideAggregationCapacity(100 * 1000) .setSerialization(Right(classOf[serialization.KryoHadoop])) .setScaldingVersion .setHRavenHistoryUserName @@ -499,9 +516,7 @@ object Config { * This copy also forces all expressions in values to be evaluated, freezing them * as well. */ - def fromHadoop(conf: Configuration): Config = - // use `conf.get` to force JobConf to evaluate expressions - Config(conf.asScala.map { e => e.getKey -> conf.get(e.getKey) }.toMap) + def fromHadoop(anyConf: Any): Config = ConfigBridge.fromPlatform(anyConf) /* * For everything BUT SERIALIZATION, this prefers values in conf, @@ -512,7 +527,7 @@ object Config { (empty .setListSpillThreshold(100 * 1000) .setMapSpillThreshold(100 * 1000) - .setMapSideAggregationThreshold(100 * 1000) ++ fromHadoop(conf)) + .setMapSideAggregationCapacity(100 * 1000) ++ fromHadoop(conf)) .setSerialization(Right(classOf[serialization.KryoHadoop])) .setScaldingVersion /* diff --git a/scalding-core/src/main/scala/com/twitter/scalding/ConfigBridge.scala b/scalding-core/src/main/scala/com/twitter/scalding/ConfigBridge.scala new file mode 100644 index 0000000000..a4d071ff71 --- /dev/null +++ b/scalding-core/src/main/scala/com/twitter/scalding/ConfigBridge.scala @@ -0,0 +1,51 @@ +package com.twitter.scalding + +import cascading.flow.FlowStep + +import scala.collection.JavaConverters._ + +/** + * Created by cchepelov on 13/01/16. + */ +object ConfigBridge { + /** + * This adapter handles the various vessel types that can be used to configure properties of a a + * FlowStep[_<: Any] since Cascading 3.0 + */ + class FlowStepAdapter(flowStep: FlowStep[_ <: Any]) { + def getConfigValue(key: String): String = + flowStep.getConfig match { + case conf: org.apache.hadoop.conf.Configuration => conf.get(key) + case conf: org.apache.commons.configuration.Configuration => conf.getString(key) + case conf: java.util.Properties => conf.getProperty(key) + case _ => throw new NotImplementedError(s"unknown flowStep Config type ${flowStep.getConfig.getClass}") + } + + def setConfigValue(name: String, value: String): FlowStep[_ <: Any] = { + flowStep.getConfig match { + case conf: org.apache.hadoop.conf.Configuration => conf.set(name, value) + case conf: org.apache.commons.configuration.Configuration => conf.addProperty(name, value) + case conf: java.util.Properties => conf.put(name, value) + case _ => throw new NotImplementedError(s"unknown flowStep Config type ${flowStep.getConfig.getClass}") + } + flowStep + } + } + + def fromPlatform(anyConf: Any): Config = { + anyConf match { + /* NOTE: for now we always return a Config instance (actually, an anonymous realization of the Config trait) + no matter what the underlying fabric/platform. Here would be a GREAT opportunity to return a specific Config + implementation (notably, to deal with things like HadoopNumReducers* ) + */ + + // use `conf.get` to force JobConf to evaluate expressions + case conf: org.apache.hadoop.conf.Configuration => Config(conf.asScala.map { e => e.getKey -> conf.get(e.getKey) }.toMap) + case conf: org.apache.commons.configuration.Configuration => Config(conf.getKeys.asScala.map { k => k.asInstanceOf[String] }.map { (k: String) => k -> conf.getString(k) }.toMap) + case conf: java.util.Properties => Config(conf.asScala.map { e => e._1 -> conf.getProperty(e._1) }.toMap) + case _ => throw new NotImplementedError(s"Can't get from 'hadoop' with configuration type ${anyConf.getClass}") + } + } + + implicit def toFlowStepPimpFromFlowStep(flowStep: FlowStep[_ <: Any]): FlowStepAdapter = new FlowStepAdapter(flowStep) +} \ No newline at end of file diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala b/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala index 5481e0a3aa..ad21d3e91b 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala @@ -803,7 +803,7 @@ object ExecutionCounters { * Just gets the counters from the CascadingStats and ignores * all the other fields present */ - def fromCascading(cs: cascading.stats.CascadingStats): ExecutionCounters = new ExecutionCounters { + def fromCascading(cs: cascading.stats.CascadingStats[_]): ExecutionCounters = new ExecutionCounters { import scala.collection.JavaConverters._ val keys = (for { diff --git a/scalding-core/src/main/scala/com/twitter/scalding/ExecutionContext.scala b/scalding-core/src/main/scala/com/twitter/scalding/ExecutionContext.scala index 8c0497866a..44ad574b31 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/ExecutionContext.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/ExecutionContext.scala @@ -15,9 +15,11 @@ limitations under the License. */ package com.twitter.scalding +import java.util.Properties + import cascading.flow.hadoop.HadoopFlow -import cascading.flow.{ Flow, FlowDef, FlowListener, FlowStepListener, FlowStepStrategy } -import cascading.flow.planner.BaseFlowStep +import cascading.flow._ +import cascading.flow.planner.{ BaseFlowNode, BaseFlowStep } import cascading.pipe.Pipe import com.twitter.scalding.reducer_estimation.ReducerEstimatorStepStrategy import com.twitter.scalding.serialization.CascadingBinaryComparator @@ -43,11 +45,11 @@ trait ExecutionContext { if (descriptions.nonEmpty) Some(descriptions.distinct.mkString(", ")) else None } - private def updateStepConfigWithDescriptions(step: BaseFlowStep[JobConf]): Unit = { - val conf = step.getConfig - getIdentifierOpt(ExecutionContext.getDesc(step)).foreach(descriptionString => { - conf.set(Config.StepDescriptions, descriptionString) - }) + private def updateStepConfigWithDescriptions(step: BaseFlowStep[_ <: Any]): Unit = { + import ConfigBridge._ + + getIdentifierOpt(ExecutionContext.getDesc(step)) + .foreach(descriptionString => step.setConfigValue(Config.StepDescriptions, descriptionString)) } final def buildFlow: Try[Flow[_]] = @@ -79,11 +81,12 @@ trait ExecutionContext { } flow match { - case hadoopFlow: HadoopFlow => - val flowSteps = hadoopFlow.getFlowSteps.asScala + case baseFlow: BaseFlow[_] => + val flowSteps = baseFlow.getFlowSteps.asScala flowSteps.foreach { - case baseFlowStep: BaseFlowStep[JobConf] => + case baseFlowStep: BaseFlowStep[_] => updateStepConfigWithDescriptions(baseFlowStep) + case anyOtherBaseFlowStep => throw new NotImplementedError("unknown flowStep type ${anyOtherBaseFlowStep.getClass}") } case _ => // descriptions not yet supported in other modes } @@ -151,7 +154,7 @@ object ExecutionContext { private val LOG: Logger = LoggerFactory.getLogger(ExecutionContext.getClass) private[scalding] def getDesc[T](baseFlowStep: BaseFlowStep[T]): Seq[String] = { - baseFlowStep.getGraph.vertexSet.asScala.toSeq.flatMap(_ match { + baseFlowStep.getElementGraph.vertexSet.asScala.toSeq.flatMap(_ match { case pipe: Pipe => RichPipe.getPipeDescriptions(pipe) case _ => List() // no descriptions }) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/FileSource.scala b/scalding-core/src/main/scala/com/twitter/scalding/FileSource.scala index c3a661a2d2..5bfc3b6824 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/FileSource.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/FileSource.scala @@ -53,7 +53,7 @@ abstract class SchemedSource extends Source { throw ModeException("Cascading local mode not supported for: " + toString) /** The scheme to use if the source is on hdfs. */ - def hdfsScheme: Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _] = + def hdfsScheme: Scheme[Configuration, RecordReader[_, _], OutputCollector[_, _], _, _] = throw ModeException("Cascading Hadoop mode not supported for: " + toString) // The mode to use for output taps determining how conflicts with existing output are handled. @@ -61,7 +61,7 @@ abstract class SchemedSource extends Source { } trait HfsTapProvider { - def createHfsTap(scheme: Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _], + def createHfsTap(scheme: Scheme[Configuration, RecordReader[_, _], OutputCollector[_, _], _, _], path: String, sinkMode: SinkMode): Hfs = new Hfs(scheme, path, sinkMode) @@ -69,8 +69,8 @@ trait HfsTapProvider { private[scalding] object CastFileTap { // The scala compiler has problems with the generics in Cascading - def apply(tap: FileTap): Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]] = - tap.asInstanceOf[Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]]] + def apply(tap: FileTap): Tap[Configuration, RecordReader[_, _], OutputCollector[_, _]] = + tap.asInstanceOf[Tap[Configuration, RecordReader[_, _], OutputCollector[_, _]]] } /** @@ -89,7 +89,7 @@ trait LocalSourceOverride extends SchemedSource { * @param sinkMode The mode for handling output conflicts. * @returns A tap. */ - def createLocalTap(sinkMode: SinkMode): Tap[JobConf, _, _] = { + def createLocalTap(sinkMode: SinkMode): Tap[Configuration, _, _] = { val taps = localPaths.map { p: String => CastFileTap(new FileTap(localScheme, p, sinkMode)) @@ -230,7 +230,7 @@ abstract class FileSource extends SchemedSource with LocalSourceOverride with Hf } tryTtp match { - case Success(s) => s + case Success(s: Tap[_, _, _]) => s case Failure(e) => throw new java.lang.IllegalArgumentException(s"Failed to create tap for: $toString, with error: ${e.getMessage}", e) } } @@ -288,8 +288,8 @@ abstract class FileSource extends SchemedSource with LocalSourceOverride with Hf } } - protected def createHdfsReadTap(hdfsMode: Hdfs): Tap[JobConf, _, _] = { - val taps: List[Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]]] = + protected def createHdfsReadTap(hdfsMode: Hdfs): Tap[_ <: Configuration, _, _] = { + val taps: List[Tap[Configuration, RecordReader[_, _], OutputCollector[_, _]]] = goodHdfsPaths(hdfsMode) .toList.map { path => CastHfsTap(createHfsTap(hdfsScheme, path, sinkMode)) } taps.size match { @@ -306,8 +306,8 @@ abstract class FileSource extends SchemedSource with LocalSourceOverride with Hf } } -class ScaldingMultiSourceTap(taps: Seq[Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]]]) - extends MultiSourceTap[Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]], JobConf, RecordReader[_, _]](taps: _*) { +class ScaldingMultiSourceTap(taps: Seq[Tap[Configuration, RecordReader[_, _], OutputCollector[_, _]]]) + extends MultiSourceTap[Tap[Configuration, RecordReader[_, _], OutputCollector[_, _]], Configuration, RecordReader[_, _]](taps: _*) { private final val randomId = UUID.randomUUID.toString override def getIdentifier() = randomId override def hashCode: Int = randomId.hashCode @@ -402,9 +402,9 @@ trait SuccessFileSource extends FileSource { * Put another way, this runs a Hadoop tap outside of Hadoop in the Cascading local mode */ trait LocalTapSource extends LocalSourceOverride { - override def createLocalTap(sinkMode: SinkMode): Tap[JobConf, _, _] = { + override def createLocalTap(sinkMode: SinkMode): Tap[Configuration, _, _] = { val taps = localPaths.map { p => - new LocalTap(p, hdfsScheme, sinkMode).asInstanceOf[Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]]] + new LocalTap(p, hdfsScheme, sinkMode).asInstanceOf[Tap[Configuration, RecordReader[_, _], OutputCollector[_, _]]] }.toSeq taps match { diff --git a/scalding-core/src/main/scala/com/twitter/scalding/HfsConfPropertySetter.scala b/scalding-core/src/main/scala/com/twitter/scalding/HfsConfPropertySetter.scala index 230378d31f..5c9fdf77ca 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/HfsConfPropertySetter.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/HfsConfPropertySetter.scala @@ -17,6 +17,7 @@ package com.twitter.scalding import cascading.tap.hadoop.Hfs import cascading.tap.SinkMode +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.JobConf import cascading.flow.FlowProcess import org.apache.hadoop.mapred.RecordReader @@ -24,10 +25,10 @@ import org.apache.hadoop.mapred.OutputCollector import cascading.scheme.Scheme private[scalding] class ConfPropertiesHfsTap(config: Config, - scheme: Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _], + scheme: Scheme[Configuration, RecordReader[_, _], OutputCollector[_, _], _, _], stringPath: String, sinkMode: SinkMode) extends Hfs(scheme, stringPath, sinkMode) { - override def sourceConfInit(process: FlowProcess[JobConf], conf: JobConf): Unit = { + override def sourceConfInit(process: FlowProcess[_ <: Configuration], conf: Configuration): Unit = { config.toMap.foreach { case (k, v) => conf.set(k, v) @@ -35,7 +36,7 @@ private[scalding] class ConfPropertiesHfsTap(config: Config, super.sourceConfInit(process, conf) } - override def sinkConfInit(process: FlowProcess[JobConf], conf: JobConf): Unit = { + override def sinkConfInit(process: FlowProcess[_ <: Configuration], conf: Configuration): Unit = { config.toMap.foreach { case (k, v) => conf.set(k, v) @@ -55,7 +56,7 @@ trait HfsConfPropertySetter extends HfsTapProvider { def tapConfig: Config = Config.empty override def createHfsTap( - scheme: Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _], + scheme: Scheme[Configuration, RecordReader[_, _], OutputCollector[_, _], _, _], path: String, sinkMode: SinkMode): Hfs = new ConfPropertiesHfsTap(tapConfig, scheme, path, sinkMode) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Job.scala b/scalding-core/src/main/scala/com/twitter/scalding/Job.scala index 787986e5a6..75f3fad069 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Job.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Job.scala @@ -172,7 +172,7 @@ class Job(val args: Args) extends FieldConversions with java.io.Serializable { val base = Config.empty .setListSpillThreshold(defaultSpillThreshold) .setMapSpillThreshold(defaultSpillThreshold) - .setMapSideAggregationThreshold(defaultSpillThreshold) + .setMapSideAggregationCapacity(defaultSpillThreshold) // This is setting a property for cascading/driven AppProps.addApplicationFramework(null, @@ -254,7 +254,7 @@ class Job(val args: Args) extends FieldConversions with java.io.Serializable { FlowStateMap.clear(flowDef) } - protected def handleStats(statsData: CascadingStats) { + protected def handleStats(statsData: CascadingStats[_]) { scaldingCascadingStats = Some(statsData) // TODO: Why the two ways to do stats? Answer: jank-den. if (args.boolean("scalding.flowstats")) { @@ -281,7 +281,7 @@ class Job(val args: Args) extends FieldConversions with java.io.Serializable { // This awful name is designed to avoid collision // with subclasses @transient - private[scalding] var scaldingCascadingStats: Option[CascadingStats] = None + private[scalding] var scaldingCascadingStats: Option[CascadingStats[_]] = None /** * Save the Flow object after a run to allow clients to inspect the job. diff --git a/scalding-core/src/main/scala/com/twitter/scalding/JobStats.scala b/scalding-core/src/main/scala/com/twitter/scalding/JobStats.scala index dac1f1a720..33584813a2 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/JobStats.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/JobStats.scala @@ -21,7 +21,7 @@ import cascading.stats.{ CascadeStats, CascadingStats, FlowStats } import scala.util.{ Failure, Try } object JobStats { - def apply(stats: CascadingStats): JobStats = { + def apply(stats: CascadingStats[_]): JobStats = { val m = statsMap(stats) new JobStats( stats match { @@ -30,14 +30,14 @@ object JobStats { }) } - private def counterMap(stats: CascadingStats): Map[String, Map[String, Long]] = + private def counterMap(stats: CascadingStats[_]): Map[String, Map[String, Long]] = stats.getCounterGroups.asScala.map { group => (group, stats.getCountersFor(group).asScala.map { counter => (counter, stats.getCounterValue(group, counter)) }.toMap) }.toMap - private def statsMap(stats: CascadingStats): Map[String, Any] = + private def statsMap(stats: CascadingStats[_]): Map[String, Any] = Map( "counters" -> counterMap(stats), "duration" -> stats.getDuration, diff --git a/scalding-core/src/main/scala/com/twitter/scalding/JobTest.scala b/scalding-core/src/main/scala/com/twitter/scalding/JobTest.scala index 148f3b0eb1..874f84b19b 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/JobTest.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/JobTest.scala @@ -59,7 +59,7 @@ object CascadeTest { class JobTest(cons: (Args) => Job) { private var argsMap = Map[String, List[String]]() private val callbacks = Buffer[() => Unit]() - private val statsCallbacks = Buffer[(CascadingStats) => Unit]() + private val statsCallbacks = Buffer[(CascadingStats[_]) => Unit]() // TODO: Switch the following maps and sets from Source to String keys // to guard for scala equality bugs private var sourceMap: (Source) => Option[Buffer[Tuple]] = { _ => None } @@ -124,13 +124,13 @@ class JobTest(cons: (Args) => Job) { // If this test is checking for multiple jobs chained by next, this only checks // for the counters in the final job's FlowStat. def counter(counter: String, group: String = Stats.ScaldingGroup)(op: Long => Unit) = { - statsCallbacks += ((stats: CascadingStats) => op(Stats.getCounterValue(counter, group)(stats))) + statsCallbacks += ((stats: CascadingStats[_]) => op(Stats.getCounterValue(counter, group)(stats))) this } // Used to check an assertion on all custom counters of a given scalding job. def counters(op: Map[String, Long] => Unit) = { - statsCallbacks += ((stats: CascadingStats) => op(Stats.getAllCustomCounters()(stats))) + statsCallbacks += ((stats: CascadingStats[_]) => op(Stats.getAllCustomCounters()(stats))) this } diff --git a/scalding-core/src/main/scala/com/twitter/scalding/MemoryTap.scala b/scalding-core/src/main/scala/com/twitter/scalding/MemoryTap.scala index 896c63496a..e754e784ab 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/MemoryTap.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/MemoryTap.scala @@ -44,11 +44,11 @@ class MemoryTap[In, Out](val scheme: Scheme[Properties, In, Out, _, _], val tupl override def getModifiedTime(conf: Properties) = if (resourceExists(conf)) modifiedTime else 0L override lazy val getIdentifier: String = scala.math.random.toString - override def openForRead(flowProcess: FlowProcess[Properties], input: In) = { + override def openForRead(flowProcess: FlowProcess[_ <: Properties], input: In) = { new TupleEntryChainIterator(scheme.getSourceFields, tupleBuffer.toIterator.asJava) } - override def openForWrite(flowProcess: FlowProcess[Properties], output: Out): TupleEntryCollector = { + override def openForWrite(flowProcess: FlowProcess[_ <: Properties], output: Out): TupleEntryCollector = { tupleBuffer.clear new MemoryTupleEntryCollector(tupleBuffer, this) } diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Mode.scala b/scalding-core/src/main/scala/com/twitter/scalding/Mode.scala index eb70fcdf9a..3d2b77c69d 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Mode.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Mode.scala @@ -65,15 +65,63 @@ object Mode { val CascadingFlowConnectorClassKey = "cascading.flow.connector.class" val CascadingFlowProcessClassKey = "cascading.flow.process.class" + val CascadingFlowProcessConfigClassKey = "cascading.flow.process.config.class" + + case class FabricSelector(flowConnectorClassName: String, flowProcessClassName: String, flowProcessConfigClassName: String) { + def exists = { + try { + val k1 = Class.forName(flowConnectorClassName) + val k2 = Class.forName(flowProcessClassName) + val k3 = Class.forName(flowProcessConfigClassName) + (k1 != null) && (k2 != null) && (k3 != null) + } catch { + case cnfe: ClassNotFoundException => false + } + } + + def selectInto(config: Configuration) = { + config.set(CascadingFlowConnectorClassKey, flowConnectorClassName) + config.set(CascadingFlowProcessClassKey, flowProcessClassName) + config.set(CascadingFlowProcessConfigClassKey, flowProcessConfigClassName) + } + } val DefaultHadoopFlowConnector = "cascading.flow.hadoop.HadoopFlowConnector" val DefaultHadoopFlowProcess = "cascading.flow.hadoop.HadoopFlowProcess" + val DefaultHadoopFlowProcessConfig = "org.apache.hadoop.mapred.JobConf" + val DefaultHadoopFabric = FabricSelector(DefaultHadoopFlowConnector, DefaultHadoopFlowProcess, DefaultHadoopFlowProcessConfig) val DefaultHadoop2Mr1FlowConnector = "cascading.flow.hadoop2.Hadoop2MR1FlowConnector" - val DefaultHadoop2Mr1FlowProcess = "cascading.flow.hadoop.HadoopFlowProcess" // no Hadoop2MR1FlowProcess as of Cascading 3.0.0-wip-75? + val DefaultHadoop2Mr1FlowProcess = DefaultHadoopFlowProcess // no Hadoop2MR1FlowProcess as of Cascading 3.0.0-wip-75? + val DefaultHadoop2Mr1FlowProcessConfig = DefaultHadoopFlowProcessConfig + val DefaultHadoop2Mr1Fabric = FabricSelector(DefaultHadoop2Mr1FlowConnector, DefaultHadoop2Mr1FlowProcess, DefaultHadoop2Mr1FlowProcessConfig) val DefaultHadoop2TezFlowConnector = "cascading.flow.tez.Hadoop2TezFlowConnector" val DefaultHadoop2TezFlowProcess = "cascading.flow.tez.Hadoop2TezFlowProcess" + val DefaultHadoop2TezFlowProcessConfig = "org.apache.tez.dag.api.TezConfiguration" + val DefaultHadoop2TezFabric = FabricSelector(DefaultHadoop2TezFlowConnector, DefaultHadoop2TezFlowProcess, DefaultHadoop2TezFlowProcessConfig) + + val DefaultFlinkFlowConnector = "com.dataartisans.flink.cascading.FlinkConnector" + val DefaultFlinkFlowProcess = "com.dataartisans.flink.cascading.runtime.util.FlinkFlowProcess" + val DefaultFlinkFlowProcessConfiguration = "org.apache.hadoop.conf.Configuration" + val DefaultFlinkFabric = FabricSelector(DefaultFlinkFlowConnector, DefaultFlinkFlowProcess, DefaultFlinkFlowProcessConfiguration) + + private lazy val selectedFabric = { + val candidates = Seq(DefaultHadoop2TezFabric, DefaultHadoopFabric, DefaultHadoop2Mr1Fabric, DefaultFlinkFabric) + + val selected = candidates.find(_.exists) + if (selected.isEmpty) { + throw new IllegalArgumentException("Can't find a default Cascading fabric. Have you put one in the CLASSPATH?") + } + + LoggerFactory.getLogger(getClass) + .info(s"Using Cascading Flow Connector: ${selected.get.flowConnectorClassName} found in CLASSPATH") + selected.get + } + + def setDefaultFabricFromClasspath(config: Configuration) = { + selectedFabric.selectInto(config) + } // This should be passed ALL the args supplied after the job name def apply(args: Args, config: Configuration): Mode = { @@ -85,22 +133,23 @@ object Mode { if (args.boolean("local")) Local(strictSources) - else if (args.boolean("hdfs")) /* FIXME: should we start printing deprecation warnings ? It's okay to set manually c.f.*.class though */ + else if (args.boolean("hdfs")) { + setDefaultFabricFromClasspath(config) Hdfs(strictSources, config) - else if (args.boolean("hadoop1")) { - config.set(CascadingFlowConnectorClassKey, DefaultHadoopFlowConnector) - config.set(CascadingFlowProcessClassKey, DefaultHadoopFlowProcess) + } else if (args.boolean("hadoop1")) { + DefaultHadoopFabric.selectInto(config) Hdfs(strictSources, config) } else if (args.boolean("hadoop2-mr1")) { - config.set(CascadingFlowConnectorClassKey, DefaultHadoop2Mr1FlowConnector) - config.set(CascadingFlowProcessClassKey, DefaultHadoop2Mr1FlowProcess) + DefaultHadoop2Mr1Fabric.selectInto(config) Hdfs(strictSources, config) } else if (args.boolean("hadoop2-tez")) { - config.set(CascadingFlowConnectorClassKey, DefaultHadoop2TezFlowConnector) - config.set(CascadingFlowProcessClassKey, DefaultHadoop2TezFlowProcess) + DefaultHadoop2TezFabric.selectInto(config) + Hdfs(strictSources, config) + } else if (args.boolean("flink")) { + DefaultFlinkFabric.selectInto(config) Hdfs(strictSources, config) } else - throw ArgsException("[ERROR] Mode must be one of --local, --hadoop1, --hadoop2-mr1, --hadoop2-tez or --hdfs, you provided none") + throw ArgsException("[ERROR] Mode must be one of --local, --hadoop1, --hadoop2-mr1, --hadoop2-tez, --flink or --hdfs, you provided none") } } @@ -156,17 +205,37 @@ trait HadoopMode extends Mode { // TODO unlike newFlowConnector, this does not look at the Job.config override def openForRead(config: Config, tap: Tap[_, _, _]) = { - val htap = tap.asInstanceOf[Tap[JobConf, _, _]] - val conf = new JobConf(true) // initialize the default config - // copy over Config - config.toMap.foreach{ case (k, v) => conf.set(k, v) } + val htap: Tap[Configuration, _, _] = tap.asInstanceOf[Tap[Configuration, _, _]] val flowProcessClass = jobConf.get(Mode.CascadingFlowProcessClassKey, Mode.DefaultHadoopFlowProcess) + val flowProcessConfigClass = jobConf.get(Mode.CascadingFlowProcessConfigClassKey, Mode.DefaultHadoopFlowProcessConfig) - val fp = try { + val (fp, conf) = try { val clazz = Class.forName(flowProcessClass) - val ctor = clazz.getConstructor(classOf[JobConf]) - ctor.newInstance(conf).asInstanceOf[FlowProcess[JobConf]] + val confClazz = Class.forName(flowProcessConfigClass) + if (!classOf[Configuration].isAssignableFrom(confClazz)) { + throw new IllegalArgumentException(s"FlowProcess configuration type ${confClazz} does not implement ${classOf[Configuration]}") + } + + val conf = { + try { + /* first constructor attempted: supposed to accept a Boolean where "true" means "load system defaults" */ + val confCtor = confClazz.getConstructor(java.lang.Boolean.TYPE) + confCtor.newInstance(java.lang.Boolean.TRUE).asInstanceOf[Configuration] + } catch { + case _: NoSuchMethodError | _: NoSuchMethodException => { + /* fallback: the Configuration should have a default constructor */ + val confCtor = confClazz.getConstructor() + confCtor.newInstance().asInstanceOf[Configuration] + } + } + } // initialize the default config + // copy over Config + config.toMap.foreach{ case (k, v) => conf.set(k, v) } + + val ctor = clazz.getConstructor(confClazz) + val inst = ctor.newInstance(conf) + (inst.asInstanceOf[FlowProcess[_ <: Configuration]], conf) } catch { case ncd: ClassNotFoundException => { throw new ModeLoadException("Failed to load Cascading flow process class " + flowProcessClass, ncd) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala b/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala index 578c76639c..8b28fcc011 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala @@ -18,8 +18,9 @@ package com.twitter.scalding { import cascading.operation._ import cascading.tuple._ import cascading.flow._ - import cascading.pipe.assembly.AggregateBy + import cascading.pipe.assembly.{ AggregateByProps, AggregateBy } import com.twitter.chill.MeatLocker + import scala.annotation.meta.{ field, param } import scala.collection.JavaConverters._ import com.twitter.algebird.{ Semigroup, SummingCache } @@ -34,7 +35,7 @@ package com.twitter.scalding { } } - class FlatMapFunction[S, T](@transient fn: S => TraversableOnce[T], fields: Fields, + class FlatMapFunction[S, T](@(transient @param) fn: S => TraversableOnce[T], fields: Fields, conv: TupleConverter[S], set: TupleSetter[T]) extends BaseOperation[Any](fields) with Function[Any] with ScaldingPrepare[Any] { val lockedFn = Externalizer(fn) @@ -46,7 +47,7 @@ package com.twitter.scalding { } } - class MapFunction[S, T](@transient fn: S => T, fields: Fields, + class MapFunction[S, T](@(transient @param) fn: S => T, fields: Fields, conv: TupleConverter[S], set: TupleSetter[T]) extends BaseOperation[Any](fields) with Function[Any] with ScaldingPrepare[Any] { val lockedFn = Externalizer(fn) @@ -67,7 +68,7 @@ package com.twitter.scalding { } } - class CleanupIdentityFunction(@transient fn: () => Unit) + class CleanupIdentityFunction(@(transient @param) fn: () => Unit) extends BaseOperation[Any](Fields.ALL) with Function[Any] with ScaldingPrepare[Any] { val lockedEf = Externalizer(fn) @@ -80,7 +81,7 @@ package com.twitter.scalding { } } - class CollectFunction[S, T](@transient fn: PartialFunction[S, T], fields: Fields, + class CollectFunction[S, T](@(transient @param) fn: PartialFunction[S, T], fields: Fields, conv: TupleConverter[S], set: TupleSetter[T]) extends BaseOperation[Any](fields) with Function[Any] with ScaldingPrepare[Any] { @@ -99,7 +100,7 @@ package com.twitter.scalding { /** * An implementation of map-side combining which is appropriate for associative and commutative functions * If a cacheSize is given, it is used, else we query - * the config for cascading.aggregateby.threshold (standard cascading param for an equivalent case) + * the config for cascading.aggregateby.capacity (standard cascading param for an equivalent case) * else we use a default value of 100,000 * * This keeps a cache of keys up to the cache-size, summing values as keys collide @@ -124,7 +125,7 @@ package com.twitter.scalding { * the typed-API. */ class MapsideReduce[V]( - @transient commutativeSemigroup: Semigroup[V], + @(transient @param) commutativeSemigroup: Semigroup[V], keyFields: Fields, valueFields: Fields, cacheSize: Option[Int])(implicit conv: TupleConverter[V], set: TupleSetter[V]) extends BaseOperation[SummingCache[Tuple, V]](Fields.join(keyFields, valueFields)) @@ -134,7 +135,7 @@ package com.twitter.scalding { val boxedSemigroup = Externalizer(commutativeSemigroup) val DEFAULT_CACHE_SIZE = 100000 - val SIZE_CONFIG_KEY = AggregateBy.AGGREGATE_BY_THRESHOLD + val SIZE_CONFIG_KEY = AggregateByProps.AGGREGATE_BY_CAPACITY def cacheSize(fp: FlowProcess[_]): Int = cacheSize.orElse { @@ -194,8 +195,8 @@ package com.twitter.scalding { * BaseOperation with support for context */ abstract class SideEffectBaseOperation[C]( - @transient bf: => C, // begin function returns a context - @transient ef: C => Unit, // end function to clean up context object + @(transient @param) bf: => C, // begin function returns a context + @(transient @param) ef: C => Unit, // end function to clean up context object fields: Fields) extends BaseOperation[C](fields) with ScaldingPrepare[C] { val lockedBf = Externalizer(() => bf) val lockedEf = Externalizer(ef) @@ -213,7 +214,7 @@ package com.twitter.scalding { */ class SideEffectMapFunction[S, C, T]( bf: => C, // begin function returns a context - @transient fn: (C, S) => T, // function that takes a context and a tuple and generate a new tuple + @(transient @param) fn: (C, S) => T, // function that takes a context and a tuple and generate a new tuple ef: C => Unit, // end function to clean up context object fields: Fields, conv: TupleConverter[S], @@ -233,7 +234,7 @@ package com.twitter.scalding { */ class SideEffectFlatMapFunction[S, C, T]( bf: => C, // begin function returns a context - @transient fn: (C, S) => TraversableOnce[T], // function that takes a context and a tuple, returns TraversableOnce of T + @(transient @param) fn: (C, S) => TraversableOnce[T], // function that takes a context and a tuple, returns TraversableOnce of T ef: C => Unit, // end function to clean up context object fields: Fields, conv: TupleConverter[S], @@ -247,7 +248,7 @@ package com.twitter.scalding { } } - class FilterFunction[T](@transient fn: T => Boolean, conv: TupleConverter[T]) + class FilterFunction[T](@(transient @param) fn: T => Boolean, conv: TupleConverter[T]) extends BaseOperation[Any] with Filter[Any] with ScaldingPrepare[Any] { val lockedFn = Externalizer(fn) @@ -258,7 +259,7 @@ package com.twitter.scalding { // All the following are operations for use in GroupBuilder - class FoldAggregator[T, X](@transient fn: (X, T) => X, @transient init: X, fields: Fields, + class FoldAggregator[T, X](@(transient @param) fn: (X, T) => X, @(transient @param) init: X, fields: Fields, conv: TupleConverter[T], set: TupleSetter[X]) extends BaseOperation[X](fields) with Aggregator[X] with ScaldingPrepare[X] { val lockedFn = Externalizer(fn) @@ -288,9 +289,9 @@ package com.twitter.scalding { * fields are the declared fields of this aggregator */ class MRMAggregator[T, X, U]( - @transient inputFsmf: T => X, - @transient inputRfn: (X, X) => X, - @transient inputMrfn: X => U, + @(transient @param) inputFsmf: T => X, + @(transient @param) inputRfn: (X, X) => X, + @(transient @param) inputMrfn: X => U, fields: Fields, conv: TupleConverter[T], set: TupleSetter[U]) extends BaseOperation[Tuple](fields) with Aggregator[Tuple] with ScaldingPrepare[Tuple] { val fsmf = Externalizer(inputFsmf) @@ -388,8 +389,8 @@ package com.twitter.scalding { * style purity. */ class MRMFunctor[T, X]( - @transient inputMrfn: T => X, - @transient inputRfn: (X, X) => X, + @(transient @param) inputMrfn: T => X, + @(transient @param) inputRfn: (X, X) => X, fields: Fields, conv: TupleConverter[T], set: TupleSetter[X]) extends FoldFunctor[X](fields) { @@ -423,8 +424,8 @@ package com.twitter.scalding { new MRMAggregator[X, X, U](args => args, rfn, mfn2, declaredFields, midConv, endSet)) class BufferOp[I, T, X]( - @transient init: I, - @transient inputIterfn: (I, Iterator[T]) => TraversableOnce[X], + @(transient @param) init: I, + @(transient @param) inputIterfn: (I, Iterator[T]) => TraversableOnce[X], fields: Fields, conv: TupleConverter[T], set: TupleSetter[X]) extends BaseOperation[Any](fields) with Buffer[Any] with ScaldingPrepare[Any] { val iterfn = Externalizer(inputIterfn) @@ -442,9 +443,9 @@ package com.twitter.scalding { * A buffer that allows state object to be set up and tear down. */ class SideEffectBufferOp[I, T, C, X]( - @transient init: I, + @(transient @param) init: I, bf: => C, // begin function returns a context - @transient inputIterfn: (I, C, Iterator[T]) => TraversableOnce[X], + @(transient @param) inputIterfn: (I, C, Iterator[T]) => TraversableOnce[X], ef: C => Unit, // end function to clean up context object fields: Fields, conv: TupleConverter[T], @@ -480,7 +481,7 @@ package com.twitter.scalding { class TypedBufferOp[K, V, U]( conv: TupleConverter[K], convV: TupleConverter[V], - @transient reduceFn: (K, Iterator[V]) => Iterator[U], + @(transient @param) reduceFn: (K, Iterator[V]) => Iterator[U], valueField: Fields) extends BaseOperation[Any](valueField) with Buffer[Any] with ScaldingPrepare[Any] { val reduceFnSer = Externalizer(reduceFn) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Source.scala b/scalding-core/src/main/scala/com/twitter/scalding/Source.scala index 094613235c..7d68b325f4 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Source.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Source.scala @@ -27,6 +27,7 @@ import cascading.tap.{ Tap, SourceTap, SinkTap } import cascading.tuple.{ Fields, Tuple => CTuple, TupleEntry, TupleEntryCollector, TupleEntryIterator } import cascading.pipe.Pipe +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapred.OutputCollector @@ -59,7 +60,7 @@ class InvalidSourceTap(val hdfsPaths: Iterable[String]) extends SourceTap[JobCon override def getModifiedTime(conf: JobConf): Long = 0L - override def openForRead(flow: FlowProcess[JobConf], input: RecordReader[_, _]): TupleEntryIterator = + override def openForRead(flow: FlowProcess[_ <: JobConf], input: RecordReader[_, _]): TupleEntryIterator = sys.error(s"InvalidSourceTap: No good paths in $hdfsPaths") override def resourceExists(conf: JobConf): Boolean = false @@ -75,7 +76,7 @@ class InvalidSourceTap(val hdfsPaths: Iterable[String]) extends SourceTap[JobCon // 4. source.validateTaps (throws InvalidSourceException) // In the worst case if the flow plan is misconfigured, // openForRead on mappers should fail when using this tap. - override def sourceConfInit(flow: FlowProcess[JobConf], conf: JobConf): Unit = { + override def sourceConfInit(flow: FlowProcess[_ <: JobConf], conf: JobConf): Unit = { conf.setInputFormat(classOf[cascading.tap.hadoop.io.MultiInputFormat]) super.sourceConfInit(flow, conf) } @@ -94,13 +95,13 @@ case object Write extends AccessMode object HadoopSchemeInstance { def apply(scheme: Scheme[_, _, _, _, _]) = - scheme.asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] + scheme.asInstanceOf[Scheme[Configuration, RecordReader[_, _], OutputCollector[_, _], _, _]] } object CastHfsTap { // The scala compiler has problems with the generics in Cascading - def apply(tap: Hfs): Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]] = - tap.asInstanceOf[Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]]] + def apply(tap: Hfs): Tap[Configuration, RecordReader[_, _], OutputCollector[_, _]] = + tap.asInstanceOf[Tap[Configuration, RecordReader[_, _], OutputCollector[_, _]]] } /** @@ -251,7 +252,8 @@ class NullTap[Config, Input, Output, SourceContext, SinkContext] SinkMode.UPDATE) { def getIdentifier = "nullTap" - def openForWrite(flowProcess: FlowProcess[Config], output: Output) = + + def openForWrite(flowProcess: FlowProcess[_ <: Config], output: Output) = new TupleEntryCollector { override def add(te: TupleEntry) {} override def add(t: CTuple) {} diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Stats.scala b/scalding-core/src/main/scala/com/twitter/scalding/Stats.scala index 94871425e0..7eb6d45745 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Stats.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Stats.scala @@ -62,11 +62,11 @@ object Stats { // When getting a counter value, cascadeStats takes precedence (if set) and // flowStats is used after that. Returns None if neither is defined. - def getCounterValue(key: StatKey)(implicit cascadingStats: CascadingStats): Long = + def getCounterValue(key: StatKey)(implicit cascadingStats: CascadingStats[_]): Long = cascadingStats.getCounterValue(key.group, key.counter) // Returns a map of all custom counter names and their counts. - def getAllCustomCounters()(implicit cascadingStats: CascadingStats): Map[String, Long] = { + def getAllCustomCounters()(implicit cascadingStats: CascadingStats[_]): Map[String, Long] = { val counts = for { counter <- cascadingStats.getCountersFor(ScaldingGroup).asScala value = getCounterValue(counter) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/TemplateSource.scala b/scalding-core/src/main/scala/com/twitter/scalding/TemplateSource.scala deleted file mode 100644 index 32ed32ed0b..0000000000 --- a/scalding-core/src/main/scala/com/twitter/scalding/TemplateSource.scala +++ /dev/null @@ -1,121 +0,0 @@ -/* -Copyright 2013 Inkling, Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ -package com.twitter.scalding - -import cascading.tap.hadoop.Hfs -import cascading.tap.hadoop.{ TemplateTap => HTemplateTap } -import cascading.tap.local.FileTap -import cascading.tap.local.{ TemplateTap => LTemplateTap } -import cascading.tap.SinkMode -import cascading.tap.Tap -import cascading.tuple.Fields - -/** - * This is a base class for template based output sources - */ -abstract class TemplateSource extends SchemedSource with HfsTapProvider { - - // The root path of the templated output. - def basePath: String - // The template as a java Formatter string. e.g. %s/%s for a two part template. - def template: String - // The fields to apply to the template. - def pathFields: Fields = Fields.ALL - - /** - * Creates the template tap. - * - * @param readOrWrite Describes if this source is being read from or written to. - * @param mode The mode of the job. (implicit) - * - * @returns A cascading TemplateTap. - */ - override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { - readOrWrite match { - case Read => throw new InvalidSourceException("Cannot use TemplateSource for input") - case Write => { - mode match { - case Local(_) => { - val localTap = new FileTap(localScheme, basePath, sinkMode) - new LTemplateTap(localTap, template, pathFields) - } - case hdfsMode @ Hdfs(_, _) => { - val hfsTap = createHfsTap(hdfsScheme, basePath, sinkMode) - new HTemplateTap(hfsTap, template, pathFields) - } - case hdfsTest @ HadoopTest(_, _) => { - val hfsTap = createHfsTap(hdfsScheme, hdfsTest.getWritePathFor(this), sinkMode) - new HTemplateTap(hfsTap, template, pathFields) - } - case _ => TestTapFactory(this, hdfsScheme).createTap(readOrWrite) - } - } - } - } - - /** - * Validates the taps, makes sure there are no nulls as the path or template. - * - * @param mode The mode of the job. - */ - override def validateTaps(mode: Mode): Unit = { - if (basePath == null) { - throw new InvalidSourceException("basePath cannot be null for TemplateTap") - } else if (template == null) { - throw new InvalidSourceException("template cannot be null for TemplateTap") - } - } -} - -/** - * An implementation of TSV output, split over a template tap. - * - * @param basePath The root path for the output. - * @param template The java formatter style string to use as the template. e.g. %s/%s. - * @param pathFields The set of fields to apply to the path. - * @param writeHeader Flag to indicate that the header should be written to the file. - * @param sinkMode How to handle conflicts with existing output. - * @param fields The set of fields to apply to the output. - */ -case class TemplatedTsv( - override val basePath: String, - override val template: String, - override val pathFields: Fields = Fields.ALL, - override val writeHeader: Boolean = false, - override val sinkMode: SinkMode = SinkMode.REPLACE, - override val fields: Fields = Fields.ALL) - extends TemplateSource with DelimitedScheme - -/** - * An implementation of SequenceFile output, split over a template tap. - * - * @param basePath The root path for the output. - * @param template The java formatter style string to use as the template. e.g. %s/%s. - * @param sequenceFields The set of fields to use for the sequence file. - * @param pathFields The set of fields to apply to the path. - * @param sinkMode How to handle conflicts with existing output. - */ -case class TemplatedSequenceFile( - override val basePath: String, - override val template: String, - val sequenceFields: Fields = Fields.ALL, - override val pathFields: Fields = Fields.ALL, - override val sinkMode: SinkMode = SinkMode.REPLACE) - extends TemplateSource with SequenceFileScheme { - - override val fields = sequenceFields -} - diff --git a/scalding-core/src/main/scala/com/twitter/scalding/TestTapFactory.scala b/scalding-core/src/main/scala/com/twitter/scalding/TestTapFactory.scala index db3426a9ff..f50c5d125d 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/TestTapFactory.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/TestTapFactory.scala @@ -25,6 +25,7 @@ import cascading.scheme.NullScheme import java.io.{ Serializable, InputStream, OutputStream } +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapred.OutputCollector import org.apache.hadoop.mapred.RecordReader @@ -44,9 +45,9 @@ object TestTapFactory extends Serializable { override def sourceFields: Fields = fields override def sinkFields: Fields = fields } - def apply[A, B](src: Source, scheme: Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], A, B]): TestTapFactory = apply(src, scheme, SinkMode.REPLACE) + def apply[A, B](src: Source, scheme: Scheme[Configuration, RecordReader[_, _], OutputCollector[_, _], A, B]): TestTapFactory = apply(src, scheme, SinkMode.REPLACE) def apply[A, B](src: Source, - scheme: Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], A, B], sinkMode: SinkMode): TestTapFactory = + scheme: Scheme[Configuration, RecordReader[_, _], OutputCollector[_, _], A, B], sinkMode: SinkMode): TestTapFactory = new TestTapFactory(src, sinkMode) { override def hdfsScheme = Some(scheme) } } @@ -57,7 +58,7 @@ class TestTapFactory(src: Source, sinkMode: SinkMode) extends Serializable { def sinkFields: Fields = hdfsScheme.map { _.getSinkFields }.getOrElse(sys.error("No sinkFields defined")) - def hdfsScheme: Option[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] = None + def hdfsScheme: Option[Scheme[Configuration, RecordReader[_, _], OutputCollector[_, _], _, _]] = None def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { mode match { diff --git a/scalding-core/src/main/scala/com/twitter/scalding/reducer_estimation/Common.scala b/scalding-core/src/main/scala/com/twitter/scalding/reducer_estimation/Common.scala index d433fa24fd..d3d330be3e 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/reducer_estimation/Common.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/reducer_estimation/Common.scala @@ -31,12 +31,14 @@ object Common { private def unrollTaps(taps: Seq[Tap[_, _, _]]): Seq[Tap[_, _, _]] = taps.flatMap { case multi: CompositeTap[_] => - unrollTaps(multi.getChildTaps.asScala.toSeq) + unrollTaps(multi.getChildTaps.asScala.map(x => x.asInstanceOf[Tap[_, _, _]]).toSeq) case t => Seq(t) } - def unrollTaps(step: FlowStep[JobConf]): Seq[Tap[_, _, _]] = - unrollTaps(step.getSources.asScala.toSeq) + def unrollTaps(step: FlowStep[_ <: JobConf]): Seq[Tap[_, _, _]] = { + val x = step.getFlowNodeGraph.getSourceTaps.asScala.toSeq + unrollTaps(x.toSeq) + } /** * Get the total size of the file(s) specified by the Hfs, which may contain a glob @@ -156,7 +158,9 @@ object ReducerEstimatorStepStrategy extends FlowStepStrategy[JobConf] { preds: JList[FlowStep[JobConf]], step: FlowStep[JobConf]): Unit = { val conf = step.getConfig - val stepNumReducers = conf.get(Config.HadoopNumReducers) + val stepNumReducers = Option(conf.get(Config.HadoopNumReducersLegacy)) + .orElse(Option(conf.get(Config.HadoopNumReducers2))) + .getOrElse("-1") // whether the reducers have been set explicitly with `withReducers` val setExplicitly = conf.getBoolean(Config.WithReducersSetExplicitly, false) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/serialization/CascadingBinaryComparator.scala b/scalding-core/src/main/scala/com/twitter/scalding/serialization/CascadingBinaryComparator.scala index 71777fad60..8d9acbe942 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/serialization/CascadingBinaryComparator.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/serialization/CascadingBinaryComparator.scala @@ -77,7 +77,7 @@ object CascadingBinaryComparator { def getDescriptionsForMissingOrdSer[U](bfs: BaseFlowStep[U]): Option[String] = // does this job have any Splices without OrderedSerialization: - if (bfs.getGraph.vertexSet.asScala.exists { + if (bfs.getElementGraph.vertexSet.asScala.exists { case gb: GroupBy => check(gb).isFailure case cg: CoGroup => check(cg).isFailure case _ => false // only do sorting in groupBy/cogroupBy @@ -94,7 +94,7 @@ object CascadingBinaryComparator { else { val badSteps = missing.size val msg = missing.zipWithIndex.map { case (msg, idx) => s"$msg" }.mkString - error(s"There are $badSteps missing OrderedSerializations: $msg") + sys.error(s"There are $badSteps missing OrderedSerializations: $msg") } } } diff --git a/scalding-core/src/main/scala/com/twitter/scalding/source/CodecSource.scala b/scalding-core/src/main/scala/com/twitter/scalding/source/CodecSource.scala index 394d7b7d10..10d3b82a9a 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/source/CodecSource.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/source/CodecSource.scala @@ -26,6 +26,7 @@ import com.twitter.scalding._ import java.util.Arrays import org.apache.hadoop.io.BytesWritable +import scala.annotation.meta.param import scala.collection.JavaConverters._ /** @@ -46,7 +47,7 @@ object CodecSource { def apply[T](paths: String*)(implicit codec: Injection[T, Array[Byte]]) = new CodecSource[T](paths) } -class CodecSource[T] private (val hdfsPaths: Seq[String], val maxFailures: Int = 0)(implicit @transient injection: Injection[T, Array[Byte]]) +class CodecSource[T] private (val hdfsPaths: Seq[String], val maxFailures: Int = 0)(implicit @(transient @param) injection: Injection[T, Array[Byte]]) extends FileSource with Mappable[T] with LocalTapSource { diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/BijectedSourceSink.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/BijectedSourceSink.scala index 4bbe1b3807..c1270018f6 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/BijectedSourceSink.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/BijectedSourceSink.scala @@ -22,13 +22,15 @@ import com.twitter.bijection.ImplicitBijection import com.twitter.scalding._ import serialization.Externalizer +import scala.annotation.meta.param + object BijectedSourceSink { type SourceSink[T] = TypedSource[T] with TypedSink[T] def apply[T, U](parent: SourceSink[T])(implicit transformer: ImplicitBijection[T, U]): BijectedSourceSink[T, U] = new BijectedSourceSink(parent)(transformer) } -class BijectedSourceSink[T, U](parent: BijectedSourceSink.SourceSink[T])(implicit @transient transformer: ImplicitBijection[T, U]) extends TypedSource[U] with TypedSink[U] { +class BijectedSourceSink[T, U](parent: BijectedSourceSink.SourceSink[T])(implicit @(transient @param) transformer: ImplicitBijection[T, U]) extends TypedSource[U] with TypedSink[U] { val lockedBij = Externalizer(transformer) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/CoGrouped.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/CoGrouped.scala index 78ed1159d2..fcc13444a3 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/CoGrouped.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/CoGrouped.scala @@ -22,6 +22,7 @@ import cascading.pipe.{ CoGroup, Pipe } import com.twitter.scalding._ +import scala.annotation.meta.param import scala.collection.JavaConverters._ import com.twitter.scalding.serialization.Externalizer @@ -314,7 +315,7 @@ trait CoGrouped[K, +R] extends KeyedListLike[K, R, CoGrouped] with CoGroupable[K abstract class CoGroupedJoiner[K](inputSize: Int, getter: TupleGetter[K], - @transient inJoinFunction: (K, Iterator[CTuple], Seq[Iterable[CTuple]]) => Iterator[Any]) extends CJoiner { + @(transient @param) inJoinFunction: (K, Iterator[CTuple], Seq[Iterable[CTuple]]) => Iterator[Any]) extends CJoiner { /** * We have a test that should fail if Externalizer is not used here. @@ -360,7 +361,7 @@ abstract class CoGroupedJoiner[K](inputSize: Int, // If all the input pipes are unique, this works: class DistinctCoGroupJoiner[K](count: Int, getter: TupleGetter[K], - @transient joinF: (K, Iterator[CTuple], Seq[Iterable[CTuple]]) => Iterator[Any]) + @(transient @param) joinF: (K, Iterator[CTuple], Seq[Iterable[CTuple]]) => Iterator[Any]) extends CoGroupedJoiner[K](count, getter, joinF) { val distinctSize = count def distinctIndexOf(idx: Int) = idx diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala index 7dd8930ca2..73f272c502 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala @@ -31,6 +31,7 @@ import com.twitter.scalding.serialization.OrderedSerialization.Result import com.twitter.scalding.serialization.macros.impl.BinaryOrdering import com.twitter.scalding.serialization.macros.impl.BinaryOrdering._ +import scala.annotation.meta.param import scala.util.Try /** @@ -928,10 +929,10 @@ class TypedPipeFactory[T] private (@transient val next: NoStackAndThen[(FlowDef, /** * This is an instance of a TypedPipe that wraps a cascading Pipe */ -class TypedPipeInst[T] private[scalding] (@transient inpipe: Pipe, +class TypedPipeInst[T] private[scalding] (@(transient @param) inpipe: Pipe, fields: Fields, - @transient localFlowDef: FlowDef, - @transient val mode: Mode, + @(transient @param) localFlowDef: FlowDef, + @(transient @param) val mode: Mode, flatMapFn: FlatMapFn[T]) extends TypedPipe[T] { /** diff --git a/scalding-core/src/test/scala/com/twitter/scalding/TemplateSourceTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/TemplateSourceTest.scala deleted file mode 100644 index 366b5c6676..0000000000 --- a/scalding-core/src/test/scala/com/twitter/scalding/TemplateSourceTest.scala +++ /dev/null @@ -1,63 +0,0 @@ -/* -Copyright 2013 Inkling, Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package com.twitter.scalding - -import java.io.File -import scala.io.{ Source => ScalaSource } - -import org.scalatest.{ Matchers, WordSpec } - -class TemplateTestJob(args: Args) extends Job(args) { - try { - Tsv("input", ('col1, 'col2)).read.write(TemplatedTsv("base", "%s", 'col1)) - } catch { - case e: Exception => e.printStackTrace() - } -} - -class TemplateSourceTest extends WordSpec with Matchers { - import Dsl._ - "TemplatedTsv" should { - "split output by template" in { - val input = Seq(("A", 1), ("A", 2), ("B", 3)) - - // Need to save the job to allow, find the temporary directory data was written to - var job: Job = null; - def buildJob(args: Args): Job = { - job = new TemplateTestJob(args) - job - } - - JobTest(buildJob(_)) - .source(Tsv("input", ('col1, 'col2)), input) - .runHadoop - .finish - - val testMode = job.mode.asInstanceOf[HadoopTest] - - val directory = new File(testMode.getWritePathFor(TemplatedTsv("base", "%s", 'col1))) - - directory.listFiles().map({ _.getName() }).toSet shouldBe Set("A", "B") - - val aSource = ScalaSource.fromFile(new File(directory, "A/part-00000")) - val bSource = ScalaSource.fromFile(new File(directory, "B/part-00000")) - - aSource.getLines.toList shouldBe Seq("A\t1", "A\t2") - bSource.getLines.toList shouldBe Seq("B\t3") - } - } -} diff --git a/scalding-core/src/test/scala/com/twitter/scalding/TypedFieldsTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/TypedFieldsTest.scala index 65e0768335..b907d68316 100644 --- a/scalding-core/src/test/scala/com/twitter/scalding/TypedFieldsTest.scala +++ b/scalding-core/src/test/scala/com/twitter/scalding/TypedFieldsTest.scala @@ -27,7 +27,7 @@ class TypedFieldsTest extends WordSpec with Matchers { "throw an exception if a field is not comparable" in { val thrown = the[FlowException] thrownBy untypedJob - thrown.getMessage shouldBe "local step failed" + thrown.getMessage should startWith("local step failed") } // Now run the typed fields version diff --git a/scalding-core/src/test/scala/com/twitter/scalding/typed/RequireOrderedSerializationTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/typed/RequireOrderedSerializationTest.scala index 4d83c34cc2..1ce5ad73cb 100644 --- a/scalding-core/src/test/scala/com/twitter/scalding/typed/RequireOrderedSerializationTest.scala +++ b/scalding-core/src/test/scala/com/twitter/scalding/typed/RequireOrderedSerializationTest.scala @@ -55,7 +55,7 @@ class RequireOrderedSerializationTest extends WordSpec with Matchers { .run .finish } - ex.getMessage should include("SerializationTest.scala:29") + ex.getMessage should startWith("There are 1 missing OrderedSerializations:") } } "A OrderedSerJob" should { diff --git a/scalding-hadoop-test/.gitignore b/scalding-hadoop-test/.gitignore new file mode 100644 index 0000000000..0beb3e581c --- /dev/null +++ b/scalding-hadoop-test/.gitignore @@ -0,0 +1 @@ +build/test diff --git a/scalding-hadoop-test/src/test/resources/hipster.txt b/scalding-hadoop-test/src/main/resources/hipster.txt similarity index 100% rename from scalding-hadoop-test/src/test/resources/hipster.txt rename to scalding-hadoop-test/src/main/resources/hipster.txt diff --git a/scalding-hadoop-test/src/test/resources/scores.tsv b/scalding-hadoop-test/src/main/resources/scores.tsv similarity index 100% rename from scalding-hadoop-test/src/test/resources/scores.tsv rename to scalding-hadoop-test/src/main/resources/scores.tsv diff --git a/scalding-hadoop-test/src/main/scala/com/twitter/scalding/platform/DagwisePlatformTest.scala b/scalding-hadoop-test/src/main/scala/com/twitter/scalding/platform/DagwisePlatformTest.scala new file mode 100644 index 0000000000..6ad3db2f6c --- /dev/null +++ b/scalding-hadoop-test/src/main/scala/com/twitter/scalding/platform/DagwisePlatformTest.scala @@ -0,0 +1,51 @@ +package com.twitter.scalding.platform + +import cascading.flow.FlowStep +import cascading.flow.planner.BaseFlowStep +import cascading.pipe.joiner.{ JoinerClosure, InnerJoin } +import cascading.tuple.Tuple + +import com.twitter.scalding._ +import com.twitter.scalding.serialization.OrderedSerialization +import java.util.{ Iterator => JIterator } +import org.scalacheck.{ Arbitrary, Gen } +import org.scalatest.{ Ignore, Matchers, WordSpec } +import org.slf4j.{ LoggerFactory, Logger } +import scala.collection.JavaConverters._ +import scala.language.experimental.macros +import scala.math.Ordering + +/** + * This trait includes tests which only make sense on platforms where each Flow is translated + * as a single DAG onto the underlying execution fabric + * (e.g. Tez, Flink but possibly not Spark) + * + */ +trait DagwisePlatformTest extends PlatformTest { + import ConfigBridge._ + + "A TypedPipeForceToDiskWithDescriptionPipe" should { + "have a custom step name from withDescription" in { + + HadoopPlatformJobTest(new TypedPipeForceToDiskWithDescriptionJob(_), cluster) + .inspectCompletedFlow { flow => + val steps = flow.getFlowSteps.asScala + steps should have size 1 + + val labels = steps.head.getConfigValue(Config.StepDescriptions).split(",").map(_.trim).toSet + labels.contains("write words to disk") should be(true) + labels.contains("output frequency by length") should be(true) + + /* ".forceToDisk" may have an influence on Tez and other "whole DAG" processing engines but + it should not cause a new Step + + (note: "partial DAG" engines where Cascading has to generate 1 or more steps + depending on the Flow's exact topology may disagree about this; in which case + make another trait from PlatformTest to model this and put the appropriate tests) + */ + } + .run + } + } + +} diff --git a/scalding-hadoop-test/src/main/scala/com/twitter/scalding/platform/HadoopPlatformJobTest.scala b/scalding-hadoop-test/src/main/scala/com/twitter/scalding/platform/HadoopPlatformJobTest.scala index 5075544915..45f1eb0f6a 100644 --- a/scalding-hadoop-test/src/main/scala/com/twitter/scalding/platform/HadoopPlatformJobTest.scala +++ b/scalding-hadoop-test/src/main/scala/com/twitter/scalding/platform/HadoopPlatformJobTest.scala @@ -15,6 +15,7 @@ limitations under the License. */ package com.twitter.scalding.platform +import scala.collection.JavaConverters._ import cascading.flow.Flow import com.twitter.scalding._ import com.twitter.scalding.source.TypedText @@ -37,7 +38,7 @@ case class HadoopPlatformJobTest( dataToCreate: Seq[(String, Seq[String])] = Vector(), sourceWriters: Seq[Args => Job] = Vector.empty, sourceReaders: Seq[Mode => Unit] = Vector.empty, - flowCheckers: Seq[Flow[JobConf] => Unit] = Vector.empty) { + flowCheckers: Seq[Flow[_] => Unit] = Vector.empty) { private val LOG = LoggerFactory.getLogger(getClass) def arg(inArg: String, value: List[String]): HadoopPlatformJobTest = copy(argsMap = argsMap + (inArg -> value)) @@ -59,7 +60,7 @@ case class HadoopPlatformJobTest( def sink[T](in: Mappable[T])(toExpect: Seq[T] => Unit): HadoopPlatformJobTest = copy(sourceReaders = sourceReaders :+ { m: Mode => toExpect(in.toIterator(Config.defaultFrom(m), m).toSeq) }) - def inspectCompletedFlow(checker: Flow[JobConf] => Unit): HadoopPlatformJobTest = + def inspectCompletedFlow(checker: Flow[_] => Unit): HadoopPlatformJobTest = copy(flowCheckers = flowCheckers :+ checker) private def createSources() { @@ -98,7 +99,7 @@ case class HadoopPlatformJobTest( checkSinks() flowCheckers.foreach { checker => job.completedFlow.collect { - case f: Flow[JobConf] => checker(f) + case f: Flow[_] => checker(f) } } } @@ -107,6 +108,13 @@ case class HadoopPlatformJobTest( @annotation.tailrec private final def runJob(job: Job) { + // create cascading 3.0 planner trace files during tests + if (System.getenv.asScala.getOrElse("SCALDING_CASCADING3_DEBUG", "0") == "1") { + System.setProperty("cascading.planner.plan.path", "target/test/cascading/traceplan/" + job.name) + System.setProperty("cascading.planner.plan.transforms.path", "target/test/cascading/traceplan/" + job.name + "/transform") + System.setProperty("cascading.planner.stats.path", "target/test/cascading/traceplan/" + job.name + "/stats") + } + job.run job.clear job.next match { diff --git a/scalding-hadoop-test/src/main/scala/com/twitter/scalding/platform/LocalCluster.scala b/scalding-hadoop-test/src/main/scala/com/twitter/scalding/platform/LocalCluster.scala index d5ae569b50..bd3155cc9f 100644 --- a/scalding-hadoop-test/src/main/scala/com/twitter/scalding/platform/LocalCluster.scala +++ b/scalding-hadoop-test/src/main/scala/com/twitter/scalding/platform/LocalCluster.scala @@ -135,7 +135,7 @@ class LocalCluster(mutex: Boolean = true) { classOf[com.twitter.chill.algebird.AveragedValueSerializer], classOf[com.twitter.algebird.Semigroup[_]], classOf[com.twitter.chill.KryoInstantiator], - classOf[org.jgrapht.ext.EdgeNameProvider[_]], + //classOf[org.jgrapht.ext.EdgeNameProvider[_]], classOf[org.apache.commons.lang.StringUtils], classOf[cascading.scheme.local.TextDelimited], classOf[org.apache.commons.logging.LogFactory], @@ -171,7 +171,10 @@ class LocalCluster(mutex: Boolean = true) { private def getFileForClass[T](clazz: Class[T]): File = new File(clazz.getProtectionDomain.getCodeSource.getLocation.toURI) - def mode: Mode = Hdfs(true, jobConf) + def mode: Mode = { + Mode.setDefaultFabricFromClasspath(jobConf) + Hdfs(true, jobConf) + } def putFile(file: File, location: String): Boolean = { val hdfsLocation = new Path(location) diff --git a/scalding-hadoop-test/src/test/scala/com/twitter/scalding/platform/PlatformTest.scala b/scalding-hadoop-test/src/main/scala/com/twitter/scalding/platform/PlatformTest.scala similarity index 78% rename from scalding-hadoop-test/src/test/scala/com/twitter/scalding/platform/PlatformTest.scala rename to scalding-hadoop-test/src/main/scala/com/twitter/scalding/platform/PlatformTest.scala index c8f27752c6..f6790b1b0b 100644 --- a/scalding-hadoop-test/src/test/scala/com/twitter/scalding/platform/PlatformTest.scala +++ b/scalding-hadoop-test/src/main/scala/com/twitter/scalding/platform/PlatformTest.scala @@ -15,6 +15,8 @@ limitations under the License. */ package com.twitter.scalding.platform +import cascading.flow.FlowStep +import cascading.flow.planner.BaseFlowStep import cascading.pipe.joiner.{ JoinerClosure, InnerJoin } import cascading.tuple.Tuple @@ -22,7 +24,7 @@ import com.twitter.scalding._ import com.twitter.scalding.serialization.OrderedSerialization import java.util.{ Iterator => JIterator } import org.scalacheck.{ Arbitrary, Gen } -import org.scalatest.{ Matchers, WordSpec } +import org.scalatest.{ Ignore, Matchers, WordSpec } import org.slf4j.{ LoggerFactory, Logger } import scala.collection.JavaConverters._ import scala.language.experimental.macros @@ -32,6 +34,7 @@ class InAndOutJob(args: Args) extends Job(args) { Tsv("input").read.write(Tsv("output")) } +/*********** RFC SECTION ***********/ object TinyJoinAndMergeJob { val peopleInput = TypedTsv[Int]("input1") val peopleData = List(1, 2, 3, 4) @@ -42,7 +45,6 @@ object TinyJoinAndMergeJob { val output = TypedTsv[(Int, Int)]("output") val outputData = List((1, 2), (2, 2), (3, 2), (4, 1)) } - class TinyJoinAndMergeJob(args: Args) extends Job(args) { import TinyJoinAndMergeJob._ @@ -55,6 +57,47 @@ class TinyJoinAndMergeJob(args: Args) extends Job(args) { (messages ++ people).groupBy('id) { _.size('count) }.write(output) } +class TinyJoinAndMergeJob2(args: Args) extends Job(args) { + import TinyJoinAndMergeJob._ + + val people = peopleInput.read.mapTo(0 -> 'id) { v: Int => v } + + val messages = messageInput.read + .mapTo(0 -> 'id) { v: Int => v } + .joinWithTiny('id -> 'id, people) + .forceToDisk + + (messages ++ people).groupBy('id) { _.size('count) }.write(output) +} + +object TinyJoinAndMergeJob3 { + val peopleInput = TypedTsv[Int]("input1") + val peopleData = List(1, 2, 3, 4) + + val messageInput = TypedTsv[Int]("input2") + val messageData = List(1, 2, 3) + + val peopleInput3 = TypedTsv[Int]("input3") + val peopleData3 = peopleData + + val output = TypedTsv[(Int, Int)]("output") + val outputData = List((1, 2), (2, 2), (3, 2), (4, 1)) +} +class TinyJoinAndMergeJob3(args: Args) extends Job(args) { + import TinyJoinAndMergeJob3._ + + val people = peopleInput.read.mapTo(0 -> 'id) { v: Int => v } + val people3 = peopleInput3.read.mapTo(0 -> 'id) { v: Int => v } + + val messages = messageInput.read + .mapTo(0 -> 'id) { v: Int => v } + .joinWithTiny('id -> 'id, people) + .forceToDisk + + (messages ++ people3).groupBy('id) { _.size('count) }.write(output) +} +/************** END RFC SECTION ************/ + object TsvNoCacheJob { val dataInput = TypedTsv[String]("fakeInput") val data = List("-0.2f -0.3f -0.5f", "-0.1f", "-0.5f") @@ -270,7 +313,8 @@ class CheckForFlowProcessInTypedJob(args: Args) extends Job(args) { // Keeping all of the specifications in the same tests puts the result output all together at the end. // This is useful given that the Hadoop MiniMRCluster and MiniDFSCluster spew a ton of logging. -class PlatformTest extends WordSpec with Matchers with HadoopSharedPlatformTest { +trait PlatformTest extends WordSpec with Matchers with HadoopSharedPlatformTest { + import ConfigBridge._ "An InAndOutTest" should { val inAndOut = Seq("a", "b", "c") @@ -283,10 +327,12 @@ class PlatformTest extends WordSpec with Matchers with HadoopSharedPlatformTest } } + /********** RFC SECTION **********/ "A TinyJoinAndMergeJob" should { import TinyJoinAndMergeJob._ - "merge and joinWithTiny shouldn't duplicate data" in { + /* FIXME: @cwensel says this leads to an unsupportable query plan, and from Cascading 3.0 this is rejected… */ + "merge and joinWithTiny shouldn't duplicate data" ignore { HadoopPlatformJobTest(new TinyJoinAndMergeJob(_), cluster) .source(peopleInput, peopleData) .source(messageInput, messageData) @@ -295,6 +341,32 @@ class PlatformTest extends WordSpec with Matchers with HadoopSharedPlatformTest } } + "A TinyJoinAndMergeJob" should { + import TinyJoinAndMergeJob._ + + "merge and joinWithTiny shouldn't duplicate data (2)" in { + HadoopPlatformJobTest(new TinyJoinAndMergeJob2(_), cluster) + .source(peopleInput, peopleData) + .source(messageInput, messageData) + .sink(output) { _.toSet shouldBe (outputData.toSet) } + .run + } + } + + "A TinyJoinAndMergeJob" should { + import TinyJoinAndMergeJob3._ + + "merge and joinWithTiny shouldn't duplicate data (3)" in { + HadoopPlatformJobTest(new TinyJoinAndMergeJob3(_), cluster) + .source(peopleInput, peopleData) + .source(messageInput, messageData) + .source(peopleInput3, peopleData3) + .sink(output) { _.toSet shouldBe (outputData.toSet) } + .run + } + } + + /************** END RFC SECTION ***************/ "A TsvNoCacheJob" should { import TsvNoCacheJob._ @@ -319,38 +391,19 @@ class PlatformTest extends WordSpec with Matchers with HadoopSharedPlatformTest } - "A TypedPipeForceToDiskWithDescriptionPipe" should { - "have a custom step name from withDescription" in { - HadoopPlatformJobTest(new TypedPipeForceToDiskWithDescriptionJob(_), cluster) - .inspectCompletedFlow { flow => - val steps = flow.getFlowSteps.asScala - val firstStep = steps.filter(_.getName.startsWith("(1/2")) - val secondStep = steps.filter(_.getName.startsWith("(2/2")) - val lab1 = firstStep.map(_.getConfig.get(Config.StepDescriptions)) - lab1 should have size 1 - lab1(0) should include ("write words to disk") - val lab2 = secondStep.map(_.getConfig.get(Config.StepDescriptions)) - lab2 should have size 1 - lab2(0) should include ("output frequency by length") - } - .run - } - } - "A TypedPipeJoinWithDescriptionPipe" should { "have a custom step name from withDescription" in { HadoopPlatformJobTest(new TypedPipeJoinWithDescriptionJob(_), cluster) .inspectCompletedFlow { flow => val steps = flow.getFlowSteps.asScala steps should have size 1 - val firstStep = steps.headOption.map(_.getConfig.get(Config.StepDescriptions)).getOrElse("") - val lines = List(147, 150, 154).map { i => - s"com.twitter.scalding.platform.TypedPipeJoinWithDescriptionJob.(PlatformTest.scala:$i" - } - firstStep should include ("leftJoin") - firstStep should include ("hashJoin") - lines.foreach { l => firstStep should include (l) } - steps.map(_.getConfig.get(Config.StepDescriptions)).foreach(s => info(s)) + val firstStepDescs = steps.headOption.map(_.getConfigValue(Config.StepDescriptions)).getOrElse("") + val firstStepDescSet = firstStepDescs.split(",").map(_.trim).toSet + + val expected = Set(190, 192, 193, 196, 197).map(linenum => /* WARNING: keep aligned with line numbers above */ + s"com.twitter.scalding.platform.TypedPipeJoinWithDescriptionJob.(PlatformTest.scala:${linenum})") ++ Seq("leftJoin", "hashJoin") + firstStepDescSet should equal(expected) + steps.map(_.getConfigValue(Config.StepDescriptions)).foreach(s => info(s)) } .run } @@ -361,18 +414,16 @@ class PlatformTest extends WordSpec with Matchers with HadoopSharedPlatformTest HadoopPlatformJobTest(new TypedPipeWithDescriptionJob(_), cluster) .inspectCompletedFlow { flow => val steps = flow.getFlowSteps.asScala - val descs = List("map stage - assign words to 1", + val expectedDescs = Set("map stage - assign words to 1", "reduce stage - sum", - "write", - // should see the .group and the .write show up as line numbers - "com.twitter.scalding.platform.TypedPipeWithDescriptionJob.(PlatformTest.scala:137)", - "com.twitter.scalding.platform.TypedPipeWithDescriptionJob.(PlatformTest.scala:141)") - - val foundDescs = steps.map(_.getConfig.get(Config.StepDescriptions)) - descs.foreach { d => - assert(foundDescs.size == 1) - assert(foundDescs(0).contains(d)) - } + "write") ++ + Seq(180, 179, 182, 183, 184).map( /* WARNING: keep aligned with line numbers above */ + linenum => s"com.twitter.scalding.platform.TypedPipeWithDescriptionJob.(PlatformTest.scala:${linenum})") + + val foundDescs = steps.map(_.getConfigValue(Config.StepDescriptions).split(",").map(_.trim).toSet) + foundDescs should have size 1 + + foundDescs.head should equal(expectedDescs) //steps.map(_.getConfig.get(Config.StepDescriptions)).foreach(s => info(s)) } .run @@ -385,19 +436,19 @@ class PlatformTest extends WordSpec with Matchers with HadoopSharedPlatformTest "distinct properly from normal data" in { HadoopPlatformJobTest(new NormalDistinctJob(_), cluster) .source[String]("input", data ++ data ++ data) - .sink[String]("output") { _.toList shouldBe data } + .sink[String]("output") { _.toList.sorted shouldBe data.sorted } // important: don't do set comparisons or "distinct" issues might be missed .run } "distinctBy(identity) properly from a list in memory" in { HadoopPlatformJobTest(new IterableSourceDistinctIdentityJob(_), cluster) - .sink[String]("output") { _.toList shouldBe data } + .sink[String]("output") { _.toList.sorted shouldBe data.sorted } // important: don't do set comparisons or "distinct" issues might be missed .run } "distinct properly from a list" in { HadoopPlatformJobTest(new IterableSourceDistinctJob(_), cluster) - .sink[String]("output") { _.toList shouldBe data } + .sink[String]("output") { _.toList.sorted shouldBe data.sorted } // important: don't do set comparisons or "distinct" issues might be missed .run } } diff --git a/scalding-hadoop-test/src/main/scala/com/twitter/scalding/platform/StepwisePlatformTest.scala b/scalding-hadoop-test/src/main/scala/com/twitter/scalding/platform/StepwisePlatformTest.scala new file mode 100644 index 0000000000..f774272ad3 --- /dev/null +++ b/scalding-hadoop-test/src/main/scala/com/twitter/scalding/platform/StepwisePlatformTest.scala @@ -0,0 +1,47 @@ +package com.twitter.scalding.platform + +import cascading.flow.FlowStep +import cascading.flow.planner.BaseFlowStep +import cascading.pipe.joiner.{ JoinerClosure, InnerJoin } +import cascading.tuple.Tuple + +import com.twitter.scalding._ +import com.twitter.scalding.serialization.OrderedSerialization +import java.util.{ Iterator => JIterator } +import org.scalacheck.{ Arbitrary, Gen } +import org.scalatest.{ Ignore, Matchers, WordSpec } +import org.slf4j.{ LoggerFactory, Logger } +import scala.collection.JavaConverters._ +import scala.language.experimental.macros +import scala.math.Ordering + +/** + * This trait includes tests which only make sense on platforms where each (or most) Cascading + * step is translated as an individual job on the underlying execution fabric + * (e.g. Hadoop MAPREDUCE) + * + * + */ +trait StepwisePlatformTest extends PlatformTest { + import ConfigBridge._ + + "A TypedPipeForceToDiskWithDescriptionPipe" should { + "have a custom step name from withDescription" in { + + HadoopPlatformJobTest(new TypedPipeForceToDiskWithDescriptionJob(_), cluster) + .inspectCompletedFlow { flow => + val steps = flow.getFlowSteps.asScala + val firstStep = steps.filter(_.getName.startsWith("(1/2)")) + val secondStep = steps.filter(_.getName.startsWith("(2/2)")) + val lab1 = firstStep.map(_.getConfigValue(Config.StepDescriptions)) + lab1 should have size 1 + lab1(0) should include ("write words to disk") + val lab2 = secondStep.map(_.getConfigValue(Config.StepDescriptions)) + lab2 should have size 1 + lab2(0) should include ("output frequency by length") + } + .run + } + } + +} diff --git a/scalding-hadoop-test/src/test/scala/com/twitter/scalding/reducer_estimation/RatioBasedEstimatorTest.scala b/scalding-hadoop-test/src/main/scala/com/twitter/scalding/reducer_estimation/RatioBasedEstimatorTest.scala similarity index 89% rename from scalding-hadoop-test/src/test/scala/com/twitter/scalding/reducer_estimation/RatioBasedEstimatorTest.scala rename to scalding-hadoop-test/src/main/scala/com/twitter/scalding/reducer_estimation/RatioBasedEstimatorTest.scala index c467a0de7c..0bb3e09698 100644 --- a/scalding-hadoop-test/src/test/scala/com/twitter/scalding/reducer_estimation/RatioBasedEstimatorTest.scala +++ b/scalding-hadoop-test/src/main/scala/com/twitter/scalding/reducer_estimation/RatioBasedEstimatorTest.scala @@ -114,7 +114,7 @@ class InvalidHistoryBasedEstimator extends RatioBasedEstimator { override val historyService = InvalidHistoryService } -class RatioBasedReducerEstimatorTest extends WordSpec with Matchers with HadoopSharedPlatformTest { +trait RatioBasedReducerEstimatorTest extends WordSpec with Matchers with HadoopSharedPlatformTest { import HipJob._ "Single-step job with ratio-based reducer estimator" should { @@ -128,8 +128,10 @@ class RatioBasedReducerEstimatorTest extends WordSpec with Matchers with HadoopS val steps = flow.getFlowSteps.asScala steps should have size 1 - val conf = steps.head.getConfig - conf.getNumReduceTasks should equal (1) // default + val conf = Config.fromHadoop(steps.head.getConfig) + + val numReducers = conf.getNumReducers + conf.getNumReducers.get should equal (1) // default } .run } @@ -144,8 +146,8 @@ class RatioBasedReducerEstimatorTest extends WordSpec with Matchers with HadoopS val steps = flow.getFlowSteps.asScala steps should have size 1 - val conf = steps.head.getConfig - conf.getNumReduceTasks should equal (1) // default + val conf = Config.fromHadoop(steps.head.getConfig) + conf.getNumReducers.get should equal (1) // default } .run } @@ -164,8 +166,10 @@ class RatioBasedReducerEstimatorTest extends WordSpec with Matchers with HadoopS // base estimate from input size reducer = 3 // reducer ratio from history = 0.5 // final estimate = ceil(3 * 0.5) = 2 - val conf = steps.head.getConfig - conf.getNumReduceTasks should equal (2) + val conf = Config.fromHadoop(steps.head.getConfig) + + val numReducers = conf.getNumReducers + conf.getNumReducers.get should equal (2) } .run } @@ -180,8 +184,10 @@ class RatioBasedReducerEstimatorTest extends WordSpec with Matchers with HadoopS val steps = flow.getFlowSteps.asScala steps should have size 1 - val conf = steps.head.getConfig - conf.getNumReduceTasks should equal (1) // default + val conf = Config.fromHadoop(steps.head.getConfig) + + val numReducers = conf.getNumReducers + conf.getNumReducers.get should equal (1) // default } .run } diff --git a/scalding-hadoop-test/src/test/scala/com/twitter/scalding/reducer_estimation/ReducerEstimatorTest.scala b/scalding-hadoop-test/src/main/scala/com/twitter/scalding/reducer_estimation/ReducerEstimatorTest.scala similarity index 96% rename from scalding-hadoop-test/src/test/scala/com/twitter/scalding/reducer_estimation/ReducerEstimatorTest.scala rename to scalding-hadoop-test/src/main/scala/com/twitter/scalding/reducer_estimation/ReducerEstimatorTest.scala index 35f7bbf85d..4032aaa10b 100644 --- a/scalding-hadoop-test/src/test/scala/com/twitter/scalding/reducer_estimation/ReducerEstimatorTest.scala +++ b/scalding-hadoop-test/src/main/scala/com/twitter/scalding/reducer_estimation/ReducerEstimatorTest.scala @@ -84,7 +84,7 @@ class SimpleMapOnlyJob(args: Args, customConfig: Config) extends Job(args) { .write(TypedTsv[String]("mapped_output")) } -class ReducerEstimatorTest extends WordSpec with Matchers with HadoopSharedPlatformTest { +trait ReducerEstimatorTest extends WordSpec with Matchers with HadoopSharedPlatformTest { import HipJob._ "Single-step job with reducer estimator" should { @@ -146,7 +146,7 @@ class ReducerEstimatorTest extends WordSpec with Matchers with HadoopSharedPlatf .sink[Double](out)(_.head shouldBe 2.86 +- 0.0001) .inspectCompletedFlow { flow => val steps = flow.getFlowSteps.asScala - val reducers = steps.map(_.getConfig.getInt(Config.HadoopNumReducers, 0)).toList + val reducers = steps.map(step => Config.fromHadoop(step.getConfig).getNumReducers.getOrElse(0)).toList reducers shouldBe List(3, 1, 1) } .run diff --git a/scalding-hadoop-test/src/test/scala/com/twitter/scalding/reducer_estimation/RuntimeReducerEstimatorTest.scala b/scalding-hadoop-test/src/main/scala/com/twitter/scalding/reducer_estimation/RuntimeReducerEstimatorTest.scala similarity index 90% rename from scalding-hadoop-test/src/test/scala/com/twitter/scalding/reducer_estimation/RuntimeReducerEstimatorTest.scala rename to scalding-hadoop-test/src/main/scala/com/twitter/scalding/reducer_estimation/RuntimeReducerEstimatorTest.scala index 71f460c26a..223ba24b73 100644 --- a/scalding-hadoop-test/src/test/scala/com/twitter/scalding/reducer_estimation/RuntimeReducerEstimatorTest.scala +++ b/scalding-hadoop-test/src/main/scala/com/twitter/scalding/reducer_estimation/RuntimeReducerEstimatorTest.scala @@ -34,7 +34,7 @@ class DummyEstimator extends ReducerEstimator { def estimateReducers(info: FlowStrategyInfo) = Some(42) } -class RuntimeReducerEstimatorTest extends WordSpec with Matchers with HadoopSharedPlatformTest { +trait RuntimeReducerEstimatorTest extends WordSpec with Matchers with HadoopSharedPlatformTest { "Single-step job with runtime-based reducer estimator" should { "set reducers correctly with median estimation scheme" in { @@ -49,7 +49,7 @@ class RuntimeReducerEstimatorTest extends WordSpec with Matchers with HadoopShar val steps = flow.getFlowSteps.asScala assert(steps.length == 1) - val conf = steps.head.getConfig + val conf = Config.fromHadoop(steps.head.getConfig) // So our histories are (taking median runtimes): // // 2 * inputSize bytes, 3 reducers * 1000 ms for each reducer @@ -66,7 +66,7 @@ class RuntimeReducerEstimatorTest extends WordSpec with Matchers with HadoopShar // so we anticipate that processing (inputSize bytes) // will take 1500 ms total. // To do this in 25 ms, we need 60 reducers. - assert(conf.getNumReduceTasks == 60) + assert(conf.getNumReducers.get == 60) } .run } @@ -83,7 +83,7 @@ class RuntimeReducerEstimatorTest extends WordSpec with Matchers with HadoopShar val steps = flow.getFlowSteps.asScala assert(steps.length == 1) - val conf = steps.head.getConfig + val conf = Config.fromHadoop(steps.head.getConfig) // So our histories are (taking mean runtimes): // // 2 * inputSize bytes, 3 reducers * 1336.67 ms for each reducer @@ -101,7 +101,7 @@ class RuntimeReducerEstimatorTest extends WordSpec with Matchers with HadoopShar // will take 1525.8 ms total. // // To do this in 25 ms, we need 61.03 reducers, which rounds up to 62. - assert(conf.getNumReduceTasks == 62) + assert(conf.getNumReducers.get == 62) } .run } @@ -118,7 +118,7 @@ class RuntimeReducerEstimatorTest extends WordSpec with Matchers with HadoopShar val steps = flow.getFlowSteps.asScala assert(steps.length == 1) - val conf = steps.head.getConfig + val conf = Config.fromHadoop(steps.head.getConfig) // So our histories are (taking mean runtimes): // // 2 * inputSize bytes, 3 reducers * 1337 ms for each reducer @@ -131,7 +131,7 @@ class RuntimeReducerEstimatorTest extends WordSpec with Matchers with HadoopShar // that the job will take 3342 ms total. // // To do this in 25 ms, we need 134 reducers. - assert(conf.getNumReduceTasks == 134) + assert(conf.getNumReducers.get == 134) } .run } @@ -148,7 +148,7 @@ class RuntimeReducerEstimatorTest extends WordSpec with Matchers with HadoopShar val steps = flow.getFlowSteps.asScala assert(steps.length == 1) - val conf = steps.head.getConfig + val conf = Config.fromHadoop(steps.head.getConfig) // So our histories are (taking median runtimes): // // 2 * inputSize bytes, 3 reducers * 1000 ms for each reducer @@ -161,7 +161,7 @@ class RuntimeReducerEstimatorTest extends WordSpec with Matchers with HadoopShar // that the job will take 3000 ms total. // // To do this in 25 ms, we need 120 reducers. - assert(conf.getNumReduceTasks == 120) + assert(conf.getNumReducers.get == 120) } .run } @@ -176,12 +176,12 @@ class RuntimeReducerEstimatorTest extends WordSpec with Matchers with HadoopShar val steps = flow.getFlowSteps.asScala assert(steps.length == 1) - val conf = steps.head.getConfig + val conf = Config.fromHadoop(steps.head.getConfig) // EmptyRuntimeEstimator should have returned None, // so it should have fallen back to DummyEstimator, // which returns 42. - assert(conf.getNumReduceTasks == 42) + assert(conf.getNumReducers.get == 42) } } @@ -195,12 +195,12 @@ class RuntimeReducerEstimatorTest extends WordSpec with Matchers with HadoopShar val steps = flow.getFlowSteps.asScala assert(steps.length == 1) - val conf = steps.head.getConfig + val conf = Config.fromHadoop(steps.head.getConfig) // ErrorRuntimeEstimator should have returned None, // so it should have fallen back to DummyEstimator, // which returns 42. - assert(conf.getNumReduceTasks == 42) + assert(conf.getNumReducers.get == 42) } } } diff --git a/scalding-hadoop-test/src/test/scala/com/twitter/scalding/HadoopFabricTest.scala b/scalding-hadoop-test/src/test/scala/com/twitter/scalding/HadoopFabricTest.scala new file mode 100644 index 0000000000..e7b3794dcf --- /dev/null +++ b/scalding-hadoop-test/src/test/scala/com/twitter/scalding/HadoopFabricTest.scala @@ -0,0 +1,14 @@ +package com.twitter.scalding + +import com.twitter.scalding.platform.StepwisePlatformTest +import com.twitter.scalding.reducer_estimation.{ RuntimeReducerEstimatorTest, ReducerEstimatorTest, RatioBasedReducerEstimatorTest } + +// Keeping all of the specifications in the same tests puts the result output all together at the end. +// This is useful given that the Hadoop MiniMRCluster and MiniDFSCluster spew a ton of logging. +class HadoopFabricTest + extends StepwisePlatformTest + with RatioBasedReducerEstimatorTest + with ReducerEstimatorTest + with RuntimeReducerEstimatorTest { + /* just realizing here the tests in a Hadooop (1.x API) context, using cascading-hadoop */ +} \ No newline at end of file diff --git a/scalding-hadoop2-mr1-test/src/test/scala/com/twitter/scalding/Hadoop2MR1FabricTest.scala b/scalding-hadoop2-mr1-test/src/test/scala/com/twitter/scalding/Hadoop2MR1FabricTest.scala new file mode 100644 index 0000000000..a931390efe --- /dev/null +++ b/scalding-hadoop2-mr1-test/src/test/scala/com/twitter/scalding/Hadoop2MR1FabricTest.scala @@ -0,0 +1,14 @@ +package com.twitter.scalding + +import com.twitter.scalding.platform.StepwisePlatformTest +import com.twitter.scalding.reducer_estimation.{ RuntimeReducerEstimatorTest, ReducerEstimatorTest, RatioBasedReducerEstimatorTest } + +// Keeping all of the specifications in the same tests puts the result output all together at the end. +// This is useful given that the Hadoop MiniMRCluster and MiniDFSCluster spew a ton of logging. +class Hadoop2MR1FabricTest + extends StepwisePlatformTest + with RatioBasedReducerEstimatorTest + with ReducerEstimatorTest + with RuntimeReducerEstimatorTest { + /* just realizing here the tests in a Hadooop+MAPREDUCE (2.x API) context, using cascading-hadoop2-mr1 */ +} diff --git a/scalding-hadoop2-tez-test/src/test/resources/log4j.properties b/scalding-hadoop2-tez-test/src/test/resources/log4j.properties new file mode 100644 index 0000000000..d4e857c374 --- /dev/null +++ b/scalding-hadoop2-tez-test/src/test/resources/log4j.properties @@ -0,0 +1,19 @@ + +# log4j configuration used during build and unit tests + +log4j.rootLogger=info,stdout +log4j.threshold=ALL +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n + +log4j.appender.CLA=org.apache.log4j.AsyncAppender +log4j.appender.CLA.AppenderRef=stdout + + +log4j.logger.cascading=INFO +log4j.logger.org.apache.hadoop=WARN +log4j.logger.org.apache.tez=WARN +log4j.logger.BlockStateChange=WARN +log4j.logger.SecurityLogger=WARN +log4j.logger.org.mortbay=WARN \ No newline at end of file diff --git a/scalding-hadoop2-tez-test/src/test/scala/com/twitter/scalding/Hadoop2TezFabricTest.scala b/scalding-hadoop2-tez-test/src/test/scala/com/twitter/scalding/Hadoop2TezFabricTest.scala new file mode 100644 index 0000000000..f463360f44 --- /dev/null +++ b/scalding-hadoop2-tez-test/src/test/scala/com/twitter/scalding/Hadoop2TezFabricTest.scala @@ -0,0 +1,36 @@ +package com.twitter.scalding + +import cascading.pipe.assembly.AggregateByProps +import com.twitter.scalding.platform.DagwisePlatformTest +import com.twitter.scalding.reducer_estimation.{ RuntimeReducerEstimatorTest, ReducerEstimatorTest, RatioBasedReducerEstimatorTest } +import org.apache.tez.dag.api.TezConfiguration +import cascading.flow.FlowRuntimeProps + +// Keeping all of the specifications in the same tests puts the result output all together at the end. +// This is useful given that the Hadoop MiniMRCluster and MiniDFSCluster spew a ton of logging. +class Hadoop2TezFabricTest + extends DagwisePlatformTest /*with RatioBasedReducerEstimatorTest + with ReducerEstimatorTest + with RuntimeReducerEstimatorTest */ { + /* just realizing here the tests in a Tez context, using cascading-hadoop2-tez */ + + override def initialize(): cluster.type = { + + val tempdir = if (Option(System.getProperty("hadoop.tmp.dir")).getOrElse("").isEmpty) "build/test/tmp" else System.getProperty("hadoop.tmp.dir") + + cluster.initialize(Config.empty + + (TezConfiguration.TEZ_LOCAL_MODE, "true") + + ("tez.runtime.optimize.local.fetch" -> "true") + + (TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS -> "3") + + (TezConfiguration.TEZ_IGNORE_LIB_URIS -> "true") + + (TezConfiguration.TEZ_GENERATE_DEBUG_ARTIFACTS -> "true") + + (TezConfiguration.TEZ_AM_SESSION_MODE -> "true") // allows multiple TezClient instances to be used in a single jvm + + ("hadoop.tmp.dir" -> tempdir) + + ("mapred.mapper.new-api" -> { + if (classOf[org.apache.hadoop.mapred.InputFormat[_, _]].isAssignableFrom(classOf[com.twitter.maple.tap.TupleMemoryInputFormat])) "false" + else if (classOf[org.apache.hadoop.mapreduce.InputFormat[_, _]].isAssignableFrom(classOf[com.twitter.maple.tap.TupleMemoryInputFormat])) "true" + else ??? + }) /* we are using c.t.maple.tap.MemorySourceTap, which Cascading can't identify as being in the old or new API */ + + (cascading.flow.FlowRuntimeProps.GATHER_PARTITIONS -> "4") /* a value must be provided */ ) + } +} diff --git a/scalding-hraven/src/main/scala/com/twitter/scalding/hraven/reducer_estimation/HRavenHistoryService.scala b/scalding-hraven/src/main/scala/com/twitter/scalding/hraven/reducer_estimation/HRavenHistoryService.scala index 0e7b0ffa68..2e5b9a58a5 100644 --- a/scalding-hraven/src/main/scala/com/twitter/scalding/hraven/reducer_estimation/HRavenHistoryService.scala +++ b/scalding-hraven/src/main/scala/com/twitter/scalding/hraven/reducer_estimation/HRavenHistoryService.scala @@ -120,12 +120,12 @@ object HRavenHistoryService extends HistoryService { */ def fetchPastJobDetails(step: FlowStep[JobConf], max: Int): Try[Seq[JobDetails]] = { val conf = step.getConfig - val stepNum = step.getStepNum + val stepId = step.getID def findMatchingJobStep(pastFlow: Flow) = pastFlow.getJobs.asScala.find { step => try { - step.getConfiguration.get("cascading.flow.step.num").toInt == stepNum + step.getConfiguration.get("cascading.flow.step.id") == stepId } catch { case _: NumberFormatException => false } diff --git a/scalding-json/src/main/scala/com/twitter/scalding/TypedJson.scala b/scalding-json/src/main/scala/com/twitter/scalding/TypedJson.scala index 1dde3d1648..582b74237d 100644 --- a/scalding-json/src/main/scala/com/twitter/scalding/TypedJson.scala +++ b/scalding-json/src/main/scala/com/twitter/scalding/TypedJson.scala @@ -2,7 +2,7 @@ package com.twitter.scalding import com.twitter.bijection.{ Injection, AbstractInjection } import com.twitter.bijection.Inversion._ -import com.twitter.elephantbird.cascading2.scheme.LzoTextLine +import com.twitter.elephantbird.cascading3.scheme.LzoTextLine import org.json4s._ import org.json4s.native.Serialization._ diff --git a/scalding-parquet-scrooge/src/main/java/com/twitter/scalding/parquet/scrooge/ParquetScroogeScheme.java b/scalding-parquet-scrooge/src/main/java/com/twitter/scalding/parquet/scrooge/ParquetScroogeScheme.java index a4ef0bb2f6..39c902e41b 100644 --- a/scalding-parquet-scrooge/src/main/java/com/twitter/scalding/parquet/scrooge/ParquetScroogeScheme.java +++ b/scalding-parquet-scrooge/src/main/java/com/twitter/scalding/parquet/scrooge/ParquetScroogeScheme.java @@ -51,7 +51,7 @@ public ParquetScroogeScheme(ParquetValueScheme.Config config) { } @Override - public void sinkConfInit(FlowProcess fp, + public void sinkConfInit(FlowProcess fp, Tap tap, JobConf jobConf) { DeprecatedParquetOutputFormat.setAsOutputFormat(jobConf); ParquetOutputFormat.setWriteSupportClass(jobConf, ScroogeWriteSupport.class); @@ -59,7 +59,7 @@ public void sinkConfInit(FlowProcess fp, } @Override - public void sourceConfInit(FlowProcess fp, + public void sourceConfInit(FlowProcess fp, Tap tap, JobConf jobConf) { super.sourceConfInit(fp, tap, jobConf); jobConf.setInputFormat(DeprecatedParquetInputFormat.class); diff --git a/scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/Parquet346ScroogeScheme.scala b/scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/Parquet346ScroogeScheme.scala index fe1444f222..84b8aab370 100644 --- a/scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/Parquet346ScroogeScheme.scala +++ b/scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/Parquet346ScroogeScheme.scala @@ -31,7 +31,7 @@ import scala.util.control.NonFatal class Parquet346ScroogeScheme[T <: ThriftStruct](config: ParquetValueScheme.Config[T]) extends ParquetScroogeScheme[T](config) { - override def sourceConfInit(fp: FlowProcess[JobConf], + override def sourceConfInit(fp: FlowProcess[_ <: JobConf], tap: Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]], jobConf: JobConf): Unit = { diff --git a/scalding-parquet/.gitignore b/scalding-parquet/.gitignore new file mode 100644 index 0000000000..0beb3e581c --- /dev/null +++ b/scalding-parquet/.gitignore @@ -0,0 +1 @@ +build/test diff --git a/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/Parquet346TBaseScheme.scala b/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/Parquet346TBaseScheme.scala index 4e6a4f9235..c482f5bcf7 100644 --- a/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/Parquet346TBaseScheme.scala +++ b/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/Parquet346TBaseScheme.scala @@ -33,7 +33,7 @@ import scala.collection.JavaConverters._ class Parquet346TBaseScheme[T <: TBase[_, _]](config: ParquetValueScheme.Config[T]) extends ParquetTBaseScheme[T](config) { - override def sourceConfInit(fp: FlowProcess[JobConf], + override def sourceConfInit(fp: FlowProcess[_ <: JobConf], tap: Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]], jobConf: JobConf): Unit = { diff --git a/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/tuple/scheme/TypedParquetTupleScheme.scala b/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/tuple/scheme/TypedParquetTupleScheme.scala index 3e06b8e107..f604dfb421 100644 --- a/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/tuple/scheme/TypedParquetTupleScheme.scala +++ b/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/tuple/scheme/TypedParquetTupleScheme.scala @@ -140,14 +140,14 @@ class TypedParquetTupleScheme[T](val readSupport: ParquetReadSupport[T], val wri type SourceCallType = SourceCall[Array[AnyRef], Reader] type SinkCallType = SinkCall[Array[AnyRef], Output] - override def sourceConfInit(flowProcess: FlowProcess[JobConf], tap: TapType, jobConf: JobConf): Unit = { + override def sourceConfInit(flowProcess: FlowProcess[_ <: JobConf], tap: TapType, jobConf: JobConf): Unit = { fp.map(ParquetInputFormat.setFilterPredicate(jobConf, _)) jobConf.setInputFormat(classOf[DeprecatedParquetInputFormat[T]]) jobConf.set(ParquetInputOutputFormat.READ_SUPPORT_INSTANCE, ParquetInputOutputFormat.injection(readSupport)) ParquetInputFormat.setReadSupportClass(jobConf, classOf[ReadSupportInstanceProxy[_]]) } - override def source(flowProcess: FlowProcess[JobConf], sc: SourceCallType): Boolean = { + override def source(flowProcess: FlowProcess[_ <: JobConf], sc: SourceCallType): Boolean = { val value: Container[T] = sc.getInput.createValue() val hasNext = sc.getInput.next(null, value) @@ -161,12 +161,12 @@ class TypedParquetTupleScheme[T](val readSupport: ParquetReadSupport[T], val wri } } - override def sinkConfInit(flowProcess: FlowProcess[JobConf], tap: TapType, jobConf: JobConf): Unit = { + override def sinkConfInit(flowProcess: FlowProcess[_ <: JobConf], tap: TapType, jobConf: JobConf): Unit = { jobConf.setOutputFormat(classOf[InnerDeprecatedParquetOutputFormat[T]]) jobConf.set(ParquetInputOutputFormat.WRITE_SUPPORT_INSTANCE, ParquetInputOutputFormat.injection(writeSupport)) } - override def sink(flowProcess: FlowProcess[JobConf], sinkCall: SinkCallType): Unit = { + override def sink(flowProcess: FlowProcess[_ <: JobConf], sinkCall: SinkCallType): Unit = { val tuple = sinkCall.getOutgoingEntry require(tuple.size == 1, "TypedParquetTupleScheme expects tuple with an arity of exactly 1, but found " + tuple.getFields) diff --git a/scalding-parquet/src/test/scala/com/twitter/scalding/parquet/tuple/TypedParquetTupleTest.scala b/scalding-parquet/src/test/scala/com/twitter/scalding/parquet/tuple/TypedParquetTupleTest.scala index e7f0eb4764..3b43fad6c3 100644 --- a/scalding-parquet/src/test/scala/com/twitter/scalding/parquet/tuple/TypedParquetTupleTest.scala +++ b/scalding-parquet/src/test/scala/com/twitter/scalding/parquet/tuple/TypedParquetTupleTest.scala @@ -1,7 +1,7 @@ package com.twitter.scalding.parquet.tuple import com.twitter.scalding.parquet.tuple.macros.Macros._ -import com.twitter.scalding.platform.{ HadoopPlatformJobTest, HadoopPlatformTest } +import com.twitter.scalding.platform.{ PlatformTest, HadoopPlatformJobTest } import com.twitter.scalding.typed.TypedPipe import com.twitter.scalding.{ Args, Job, TypedTsv } import org.scalatest.{ Matchers, WordSpec } @@ -9,7 +9,7 @@ import org.apache.parquet.filter2.predicate.FilterApi.binaryColumn import org.apache.parquet.filter2.predicate.{ FilterApi, FilterPredicate } import org.apache.parquet.io.api.Binary -class TypedParquetTupleTest extends WordSpec with Matchers with HadoopPlatformTest { +class TypedParquetTupleTest extends WordSpec with Matchers with PlatformTest { "TypedParquetTuple" should { "read and write correctly" in { diff --git a/scalding-thrift-macros/.gitignore b/scalding-thrift-macros/.gitignore new file mode 100644 index 0000000000..0beb3e581c --- /dev/null +++ b/scalding-thrift-macros/.gitignore @@ -0,0 +1 @@ +build/test diff --git a/scripts/scald.rb b/scripts/scald.rb index 62dc767c2b..45d201a580 100755 --- a/scripts/scald.rb +++ b/scripts/scald.rb @@ -262,7 +262,10 @@ def get_dep_location(org, dep, version) if (!CONFIG["jar"]) #what jar has all the dependencies for this job - CONFIG["jar"] = repo_root + "/scalding-core/target/scala-#{SHORT_SCALA_VERSION}/scalding-core-assembly-#{SCALDING_VERSION}.jar" + CONFIG["jar"] = repo_root + "/scalding-core/target/scala-#{SHORT_SCALA_VERSION}/scalding-core-assembly-#{SCALDING_VERSION}.jar" + EXTRACP = repo_root + "/scalding-repl/target/scala-#{SHORT_SCALA_VERSION}/scalding-repl-assembly-#{SCALDING_VERSION}.jar" +else + EXTRACP = "" end #Check that we can find the jar: @@ -508,7 +511,7 @@ def needs_rebuild? def build_job_jar $stderr.puts("compiling " + JOBFILE) FileUtils.mkdir_p(BUILDDIR) - classpath = (([LIBCP, JARPATH, MODULEJARPATHS, CLASSPATH].select { |s| s != "" }) + convert_dependencies_to_jars).flatten.join(":") + classpath = (([LIBCP, JARPATH, MODULEJARPATHS, EXTRACP, CLASSPATH].select { |s| s != "" }) + convert_dependencies_to_jars).flatten.join(":") puts("#{file_type}c -classpath #{classpath} -d #{BUILDDIR} #{JOBFILE}") unless system("#{COMPILE_CMD} -classpath #{classpath} -d #{BUILDDIR} #{JOBFILE}") puts "[SUGGESTION]: Try scald.rb --clean, you may have corrupt jars lying around" @@ -523,7 +526,7 @@ def build_job_jar end def hadoop_classpath - (["/usr/share/java/hadoop-lzo-0.4.15.jar", JARBASE, MODULEJARPATHS.map{|n| File.basename(n)}, "job-jars/#{JOBJAR}"].select { |s| s != "" }).flatten.join(":") + (["/usr/share/java/hadoop-lzo-0.4.15.jar", JARBASE, EXTRACP, MODULEJARPATHS.map{|n| File.basename(n)}, "job-jars/#{JOBJAR}"].select { |s| s != "" }).flatten.join(":") end def hadoop_command @@ -570,7 +573,7 @@ def local_cmd(mode) [] end - classpath = ([JARPATH, MODULEJARPATHS].select { |s| s != "" } + convert_dependencies_to_jars + localHadoopDepPaths).flatten.join(":") + (is_file? ? ":#{JOBJARPATH}" : "") + + classpath = ([JARPATH, EXTRACP, MODULEJARPATHS].select { |s| s != "" } + convert_dependencies_to_jars + localHadoopDepPaths).flatten.join(":") + (is_file? ? ":#{JOBJARPATH}" : "") + ":" + CLASSPATH "java -Xmx#{LOCALMEM} -cp #{classpath} #{TOOL} #{JOB} #{mode} #{JOB_ARGS}" end