From fb887e64716e6ba4c274dd4717e4f82666a2afee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cyrille=20Ch=C3=A9p=C3=A9lov=20=28TP12=29?= Date: Fri, 6 Mar 2015 10:24:03 +0100 Subject: [PATCH 1/6] Make Cascading Fabric selection configurable, no longer hardcoding HadoopFlowConnector --- .../scala/com/twitter/scalding/Mode.scala | 30 ++++++++++++++----- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Mode.scala b/scalding-core/src/main/scala/com/twitter/scalding/Mode.scala index 71e539fc3d..a4fcd31ee1 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Mode.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Mode.scala @@ -22,9 +22,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{ FileSystem, Path } import org.apache.hadoop.mapred.JobConf -import cascading.flow.FlowConnector -import cascading.flow.hadoop.HadoopFlowProcess -import cascading.flow.hadoop.HadoopFlowConnector +import cascading.flow.{ FlowProcess, FlowConnector, FlowDef, Flow } import cascading.flow.local.LocalFlowConnector import cascading.flow.local.LocalFlowProcess import cascading.property.AppProps @@ -73,10 +71,22 @@ object Mode { if (args.boolean("local")) Local(strictSources) - else if (args.boolean("hdfs")) + else if (args.boolean("hdfs")) /* FIXME: should we start printing deprecation warnings ? It's okay to set manually c.f.*.class though */ Hdfs(strictSources, config) - else - throw ArgsException("[ERROR] Mode must be one of --local or --hdfs, you provided neither") + else if (args.boolean("hadoop1")) { + config.set("cascading.flow.connector.class", "cascading.flow.hadoop.HadoopFlowConnector") + config.set("cascading.flow.process.class", "cascading.flow.hadoop.HadoopFlowProcess") + Hdfs(strictSources, config) + } else if (args.boolean("hadoop2-mr1")) { + config.set("cascading.flow.connector.class", "cascading.flow.hadoop2.Hadoop2MR1FlowConnector") + config.set("cascading.flow.process.class", "cascading.flow.hadoop.HadoopFlowProcess") // no Hadoop2MR1FlowProcess as of Cascading 3.0.0-wip-75 + Hdfs(strictSources, config) + } else if (args.boolean("hadoop2-tez")) { + config.set("cascading.flow.connector.class", "cascading.flow.tez.Hadoop2TezFlowConnector") + config.set("cascading.flow.process.class", "cascading.flow.tez.Hadoop2TezFlowProcess") + Hdfs(strictSources, config) + } else + throw ArgsException("[ERROR] Mode must be one of --local, --hadoop1, --hadoop2-mr1, --hadoop2-tez or --hdfs, you provided none") } } @@ -116,7 +126,9 @@ trait HadoopMode extends Mode { asMap - jarKey case None => asMap } - new HadoopFlowConnector(finalMap.asJava) + val clazz = Class.forName(jobConf.get("cascading.flow.connector.class", "cascading.flow.hadoop.HadoopFlowConnector")) + val ctor = clazz.getConstructor(classOf[java.util.Map[_, _]]) + ctor.newInstance(finalMap.asJava).asInstanceOf[FlowConnector] } // TODO unlike newFlowConnector, this does not look at the Job.config @@ -125,7 +137,9 @@ trait HadoopMode extends Mode { val conf = new JobConf(true) // initialize the default config // copy over Config config.toMap.foreach{ case (k, v) => conf.set(k, v) } - val fp = new HadoopFlowProcess(conf) + val clazz = Class.forName(jobConf.get("cascading.flow.process.class", "cascading.flow.hadoop.HadoopFlowProcess")) + val ctor = clazz.getConstructor(classOf[java.util.Map[_, _]]) + val fp = ctor.newInstance(conf).asInstanceOf[FlowProcess[JobConf]] htap.retrieveSourceFields(fp) htap.sourceConfInit(fp, conf) htap.openForRead(fp) From 66cab848a982f8cb6c1116683547d898ebf572dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cyrille=20Ch=C3=A9p=C3=A9lov=20=28TP12=29?= Date: Fri, 6 Mar 2015 11:30:45 +0100 Subject: [PATCH 2/6] HadoopFlowProcess exposes a JobConf constructor --- scalding-core/src/main/scala/com/twitter/scalding/Mode.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Mode.scala b/scalding-core/src/main/scala/com/twitter/scalding/Mode.scala index a4fcd31ee1..866b3fda92 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Mode.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Mode.scala @@ -138,7 +138,7 @@ trait HadoopMode extends Mode { // copy over Config config.toMap.foreach{ case (k, v) => conf.set(k, v) } val clazz = Class.forName(jobConf.get("cascading.flow.process.class", "cascading.flow.hadoop.HadoopFlowProcess")) - val ctor = clazz.getConstructor(classOf[java.util.Map[_, _]]) + val ctor = clazz.getConstructor(classOf[JobConf]) val fp = ctor.newInstance(conf).asInstanceOf[FlowProcess[JobConf]] htap.retrieveSourceFields(fp) htap.sourceConfInit(fp, conf) From a549a545851291f2b9c9203dd63f46c2a4b8a100 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cyrille=20Ch=C3=A9p=C3=A9lov=20=28TP12=29?= Date: Mon, 9 Mar 2015 14:09:25 +0100 Subject: [PATCH 3/6] More explicit errors in case of failure to load the appropriate fabric --- .../scala/com/twitter/scalding/Mode.scala | 57 +++++++++++++++---- .../scala/com/twitter/scalding/XHandler.scala | 5 +- .../com/twitter/scalding/XHandlerTest.scala | 4 ++ 3 files changed, 53 insertions(+), 13 deletions(-) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Mode.scala b/scalding-core/src/main/scala/com/twitter/scalding/Mode.scala index 866b3fda92..d63d46ad9b 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Mode.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Mode.scala @@ -41,6 +41,8 @@ import org.slf4j.LoggerFactory case class ModeException(message: String) extends RuntimeException(message) +case class ModeLoadException(message: String, origin: NoClassDefFoundError) extends RuntimeException(origin) + object Mode { /** * This is a Args and a Mode together. It is used purely as @@ -61,6 +63,18 @@ object Mode { case _ => None } + val CascadingFlowConnectorClassKey = "cascading.flow.connector.class" + val CascadingFlowProcessClassKey = "cascading.flow.process.class" + + val DefaultHadoopFlowConnector = "cascading.flow.hadoop.HadoopFlowConnector" + val DefaultHadoopFlowProcess = "cascading.flow.hadoop.HadoopFlowProcess" + + val DefaultHadoop2Mr1FlowConnector = "cascading.flow.hadoop2.Hadoop2MR1FlowConnector" + val DefaultHadoop2Mr1FlowProcess = "cascading.flow.hadoop.HadoopFlowProcess" // no Hadoop2MR1FlowProcess as of Cascading 3.0.0-wip-75? + + val DefaultHadoop2TezFlowConnector = "cascading.flow.tez.Hadoop2TezFlowConnector" + val DefaultHadoop2TezFlowProcess = "cascading.flow.tez.Hadoop2TezFlowProcess" + // This should be passed ALL the args supplied after the job name def apply(args: Args, config: Configuration): Mode = { val strictSources = args.boolean("tool.partialok") == false @@ -74,16 +88,16 @@ object Mode { else if (args.boolean("hdfs")) /* FIXME: should we start printing deprecation warnings ? It's okay to set manually c.f.*.class though */ Hdfs(strictSources, config) else if (args.boolean("hadoop1")) { - config.set("cascading.flow.connector.class", "cascading.flow.hadoop.HadoopFlowConnector") - config.set("cascading.flow.process.class", "cascading.flow.hadoop.HadoopFlowProcess") + config.set(CascadingFlowConnectorClassKey, DefaultHadoopFlowConnector) + config.set(CascadingFlowProcessClassKey, DefaultHadoopFlowProcess) Hdfs(strictSources, config) } else if (args.boolean("hadoop2-mr1")) { - config.set("cascading.flow.connector.class", "cascading.flow.hadoop2.Hadoop2MR1FlowConnector") - config.set("cascading.flow.process.class", "cascading.flow.hadoop.HadoopFlowProcess") // no Hadoop2MR1FlowProcess as of Cascading 3.0.0-wip-75 + config.set(CascadingFlowConnectorClassKey, DefaultHadoop2Mr1FlowConnector) + config.set(CascadingFlowProcessClassKey, DefaultHadoop2Mr1FlowProcess) Hdfs(strictSources, config) } else if (args.boolean("hadoop2-tez")) { - config.set("cascading.flow.connector.class", "cascading.flow.tez.Hadoop2TezFlowConnector") - config.set("cascading.flow.process.class", "cascading.flow.tez.Hadoop2TezFlowProcess") + config.set(CascadingFlowConnectorClassKey, DefaultHadoop2TezFlowConnector) + config.set(CascadingFlowProcessClassKey, DefaultHadoop2TezFlowProcess) Hdfs(strictSources, config) } else throw ArgsException("[ERROR] Mode must be one of --local, --hadoop1, --hadoop2-mr1, --hadoop2-tez or --hdfs, you provided none") @@ -126,9 +140,18 @@ trait HadoopMode extends Mode { asMap - jarKey case None => asMap } - val clazz = Class.forName(jobConf.get("cascading.flow.connector.class", "cascading.flow.hadoop.HadoopFlowConnector")) - val ctor = clazz.getConstructor(classOf[java.util.Map[_, _]]) - ctor.newInstance(finalMap.asJava).asInstanceOf[FlowConnector] + + val flowConnectorClass = jobConf.get(Mode.CascadingFlowConnectorClassKey, Mode.DefaultHadoopFlowConnector) + + try { + val clazz = Class.forName(flowConnectorClass) + val ctor = clazz.getConstructor(classOf[java.util.Map[_, _]]) + ctor.newInstance(finalMap.asJava).asInstanceOf[FlowConnector] + } catch { + case ncd: NoClassDefFoundError => { + throw new ModeLoadException("Failed to load Cascading flow connector class " + flowConnectorClass, ncd) + } + } } // TODO unlike newFlowConnector, this does not look at the Job.config @@ -137,9 +160,19 @@ trait HadoopMode extends Mode { val conf = new JobConf(true) // initialize the default config // copy over Config config.toMap.foreach{ case (k, v) => conf.set(k, v) } - val clazz = Class.forName(jobConf.get("cascading.flow.process.class", "cascading.flow.hadoop.HadoopFlowProcess")) - val ctor = clazz.getConstructor(classOf[JobConf]) - val fp = ctor.newInstance(conf).asInstanceOf[FlowProcess[JobConf]] + + val flowProcessClass = jobConf.get(Mode.CascadingFlowProcessClassKey, Mode.DefaultHadoopFlowProcess) + + val fp = try { + val clazz = Class.forName(flowProcessClass) + val ctor = clazz.getConstructor(classOf[JobConf]) + ctor.newInstance(conf).asInstanceOf[FlowProcess[JobConf]] + } catch { + case ncd: NoClassDefFoundError => { + throw new ModeLoadException("Failed to load Cascading flow process class " + flowProcessClass, ncd) + } + } + htap.retrieveSourceFields(fp) htap.sourceConfInit(fp, conf) htap.openForRead(fp) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/XHandler.scala b/scalding-core/src/main/scala/com/twitter/scalding/XHandler.scala index 8c17a7b10b..7f57fec9a8 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/XHandler.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/XHandler.scala @@ -27,11 +27,14 @@ object RichXHandler { val BinaryProblem = "GUESS: This may be a problem with the binary version of a dependency. " + "Check which versions of dependencies you're pulling in." + val RequiredCascadingFabricNotInClassPath = "GUESS: Required Cascading fabric is not supplied in the classpath." + + "Check which versions and variants of dependencies you're pulling in." + val DataIsMissing = "GUESS: Data is missing from the path you provided." val RequireSinks = "GUESS: Cascading requires all sources to have final sinks on disk." - val mapping: Map[Class[_ <: Throwable], String] = Map( + classOf[ModeLoadException] -> RequiredCascadingFabricNotInClassPath, classOf[NoClassDefFoundError] -> BinaryProblem, classOf[AbstractMethodError] -> BinaryProblem, classOf[NoSuchMethodError] -> BinaryProblem, diff --git a/scalding-core/src/test/scala/com/twitter/scalding/XHandlerTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/XHandlerTest.scala index ae962cf668..360f00e3b8 100644 --- a/scalding-core/src/test/scala/com/twitter/scalding/XHandlerTest.scala +++ b/scalding-core/src/test/scala/com/twitter/scalding/XHandlerTest.scala @@ -28,6 +28,7 @@ class XHandlerTest extends WordSpec with Matchers { rxh.handlers.find(h => h(new NoSuchMethodError)) should not be empty rxh.handlers.find(h => h(new AbstractMethodError)) should not be empty rxh.handlers.find(h => h(new NoClassDefFoundError)) should not be empty + rxh.handlers.find(h => h(new ModeLoadException("dummy", new NoClassDefFoundError))) should not be empty } "be handled if exist in custom mapping" in { val cRxh = RichXHandler(RichXHandler.mapping ++ Map(classOf[NullPointerException] -> "NPE")) @@ -41,6 +42,7 @@ class XHandlerTest extends WordSpec with Matchers { } "be valid keys in mapping if defined" in { val rxh = RichXHandler() + rxh.mapping(classOf[ModeLoadException]) shouldBe RichXHandler.RequiredCascadingFabricNotInClassPath rxh.mapping(classOf[PlannerException]) shouldBe RichXHandler.RequireSinks rxh.mapping(classOf[InvalidSourceException]) shouldBe RichXHandler.DataIsMissing rxh.mapping(classOf[NoSuchMethodError]) shouldBe RichXHandler.BinaryProblem @@ -54,11 +56,13 @@ class XHandlerTest extends WordSpec with Matchers { val NoSuchMethodErrorString = "javalangnosuchmethoderror" val InvalidSouceExceptionString = "comtwitterscaldinginvalidsourceexception" val PlannerExceptionString = "cascadingflowplannerplannerexception" + val ModeLoadExceptionString = "comtwitterscaldingmodeloadexception" RichXHandler.createXUrl(new PlannerException) shouldBe (RichXHandler.gitHubUrl + PlannerExceptionString) RichXHandler.createXUrl(new InvalidSourceException("Invalid Source")) shouldBe (RichXHandler.gitHubUrl + InvalidSouceExceptionString) RichXHandler.createXUrl(new NoSuchMethodError) shouldBe (RichXHandler.gitHubUrl + NoSuchMethodErrorString) RichXHandler.createXUrl(new AbstractMethodError) shouldBe (RichXHandler.gitHubUrl + AbstractMethodErrorString) RichXHandler.createXUrl(new NoClassDefFoundError) shouldBe (RichXHandler.gitHubUrl + NoClassDefFoundErrorString) + RichXHandler.createXUrl(ModeLoadException("dummy", new NoClassDefFoundError)) shouldBe (RichXHandler.gitHubUrl + ModeLoadExceptionString) } } } From 0dbbe2bcce2dd8b04f69b8a60106ba6e849db55c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cyrille=20Ch=C3=A9p=C3=A9lov=20=28TP12=29?= Date: Mon, 9 Mar 2015 14:34:06 +0100 Subject: [PATCH 4/6] s/NoClassDefFoundError/ClassNotFoundException/ --- .../src/main/scala/com/twitter/scalding/Mode.scala | 6 +++--- .../src/test/scala/com/twitter/scalding/XHandlerTest.scala | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Mode.scala b/scalding-core/src/main/scala/com/twitter/scalding/Mode.scala index d63d46ad9b..eb70fcdf9a 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Mode.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Mode.scala @@ -41,7 +41,7 @@ import org.slf4j.LoggerFactory case class ModeException(message: String) extends RuntimeException(message) -case class ModeLoadException(message: String, origin: NoClassDefFoundError) extends RuntimeException(origin) +case class ModeLoadException(message: String, origin: ClassNotFoundException) extends RuntimeException(origin) object Mode { /** @@ -148,7 +148,7 @@ trait HadoopMode extends Mode { val ctor = clazz.getConstructor(classOf[java.util.Map[_, _]]) ctor.newInstance(finalMap.asJava).asInstanceOf[FlowConnector] } catch { - case ncd: NoClassDefFoundError => { + case ncd: ClassNotFoundException => { throw new ModeLoadException("Failed to load Cascading flow connector class " + flowConnectorClass, ncd) } } @@ -168,7 +168,7 @@ trait HadoopMode extends Mode { val ctor = clazz.getConstructor(classOf[JobConf]) ctor.newInstance(conf).asInstanceOf[FlowProcess[JobConf]] } catch { - case ncd: NoClassDefFoundError => { + case ncd: ClassNotFoundException => { throw new ModeLoadException("Failed to load Cascading flow process class " + flowProcessClass, ncd) } } diff --git a/scalding-core/src/test/scala/com/twitter/scalding/XHandlerTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/XHandlerTest.scala index 360f00e3b8..dd93ef5193 100644 --- a/scalding-core/src/test/scala/com/twitter/scalding/XHandlerTest.scala +++ b/scalding-core/src/test/scala/com/twitter/scalding/XHandlerTest.scala @@ -28,7 +28,7 @@ class XHandlerTest extends WordSpec with Matchers { rxh.handlers.find(h => h(new NoSuchMethodError)) should not be empty rxh.handlers.find(h => h(new AbstractMethodError)) should not be empty rxh.handlers.find(h => h(new NoClassDefFoundError)) should not be empty - rxh.handlers.find(h => h(new ModeLoadException("dummy", new NoClassDefFoundError))) should not be empty + rxh.handlers.find(h => h(new ModeLoadException("dummy", new ClassNotFoundException))) should not be empty } "be handled if exist in custom mapping" in { val cRxh = RichXHandler(RichXHandler.mapping ++ Map(classOf[NullPointerException] -> "NPE")) @@ -62,7 +62,7 @@ class XHandlerTest extends WordSpec with Matchers { RichXHandler.createXUrl(new NoSuchMethodError) shouldBe (RichXHandler.gitHubUrl + NoSuchMethodErrorString) RichXHandler.createXUrl(new AbstractMethodError) shouldBe (RichXHandler.gitHubUrl + AbstractMethodErrorString) RichXHandler.createXUrl(new NoClassDefFoundError) shouldBe (RichXHandler.gitHubUrl + NoClassDefFoundErrorString) - RichXHandler.createXUrl(ModeLoadException("dummy", new NoClassDefFoundError)) shouldBe (RichXHandler.gitHubUrl + ModeLoadExceptionString) + RichXHandler.createXUrl(ModeLoadException("dummy", new ClassNotFoundException)) shouldBe (RichXHandler.gitHubUrl + ModeLoadExceptionString) } } } From 0a496bff8297a602d393fdcbff7053edb673abfe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cyrille=20Ch=C3=A9p=C3=A9lov=20=28TP12=29?= Date: Wed, 11 Mar 2015 15:05:51 +0100 Subject: [PATCH 5/6] No longer peel off all wrapping exceptions, only unless a mapping is found --- .../src/main/scala/com/twitter/scalding/XHandler.scala | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/XHandler.scala b/scalding-core/src/main/scala/com/twitter/scalding/XHandler.scala index 7f57fec9a8..94e6d1d8c8 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/XHandler.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/XHandler.scala @@ -50,6 +50,14 @@ object RichXHandler { case cause => rootOf(cause) } + @annotation.tailrec + final def getMapping(t: Throwable): Option[String] = + (mapping.get(t.getClass), t.getCause) match { + case (Some(diag), _) => Some(diag) + case (None, null) => None + case (None, cause) => getMapping(cause) + } + def createXUrl(t: Throwable): String = gitHubUrl + (rootOf(t).getClass.getName.replace(".", "").toLowerCase) @@ -57,7 +65,7 @@ object RichXHandler { new XHandler(xMap, dVal) def apply(t: Throwable): String = - mapping.get(rootOf(t).getClass) + getMapping(t) .map(_ + "\n") .getOrElse("") + "If you know what exactly caused this error, please consider contributing to GitHub via following link.\n" + From 59899eb3275280892e1d16f31732ffb39cc69f55 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cyrille=20Ch=C3=A9p=C3=A9lov=20=28TP12=29?= Date: Tue, 17 Mar 2015 09:08:38 +0100 Subject: [PATCH 6/6] Peeling off exceptions also for building the github help url --- .../main/scala/com/twitter/scalding/XHandler.scala | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/XHandler.scala b/scalding-core/src/main/scala/com/twitter/scalding/XHandler.scala index 94e6d1d8c8..7b1fb7b519 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/XHandler.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/XHandler.scala @@ -33,6 +33,7 @@ object RichXHandler { val DataIsMissing = "GUESS: Data is missing from the path you provided." val RequireSinks = "GUESS: Cascading requires all sources to have final sinks on disk." + val mapping: Map[Class[_ <: Throwable], String] = Map( classOf[ModeLoadException] -> RequiredCascadingFabricNotInClassPath, classOf[NoClassDefFoundError] -> BinaryProblem, @@ -51,21 +52,21 @@ object RichXHandler { } @annotation.tailrec - final def getMapping(t: Throwable): Option[String] = + final def peelUntilMappable(t: Throwable): Class[_ <: Throwable] = (mapping.get(t.getClass), t.getCause) match { - case (Some(diag), _) => Some(diag) - case (None, null) => None - case (None, cause) => getMapping(cause) + case (Some(diag), _) => t.getClass // we're going to find a mappable cause. + case (None, null) => t.getClass // we're at the root. There won't be any cause + case (None, cause) => peelUntilMappable(cause) } def createXUrl(t: Throwable): String = - gitHubUrl + (rootOf(t).getClass.getName.replace(".", "").toLowerCase) + gitHubUrl + (peelUntilMappable(t).getName.replace(".", "").toLowerCase) def apply(xMap: Map[Class[_ <: Throwable], String] = mapping, dVal: String = Default) = new XHandler(xMap, dVal) def apply(t: Throwable): String = - getMapping(t) + mapping.get(peelUntilMappable(t)) .map(_ + "\n") .getOrElse("") + "If you know what exactly caused this error, please consider contributing to GitHub via following link.\n" +