diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Mode.scala b/scalding-core/src/main/scala/com/twitter/scalding/Mode.scala index 71e539fc3d..eb70fcdf9a 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Mode.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Mode.scala @@ -22,9 +22,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{ FileSystem, Path } import org.apache.hadoop.mapred.JobConf -import cascading.flow.FlowConnector -import cascading.flow.hadoop.HadoopFlowProcess -import cascading.flow.hadoop.HadoopFlowConnector +import cascading.flow.{ FlowProcess, FlowConnector, FlowDef, Flow } import cascading.flow.local.LocalFlowConnector import cascading.flow.local.LocalFlowProcess import cascading.property.AppProps @@ -43,6 +41,8 @@ import org.slf4j.LoggerFactory case class ModeException(message: String) extends RuntimeException(message) +case class ModeLoadException(message: String, origin: ClassNotFoundException) extends RuntimeException(origin) + object Mode { /** * This is a Args and a Mode together. It is used purely as @@ -63,6 +63,18 @@ object Mode { case _ => None } + val CascadingFlowConnectorClassKey = "cascading.flow.connector.class" + val CascadingFlowProcessClassKey = "cascading.flow.process.class" + + val DefaultHadoopFlowConnector = "cascading.flow.hadoop.HadoopFlowConnector" + val DefaultHadoopFlowProcess = "cascading.flow.hadoop.HadoopFlowProcess" + + val DefaultHadoop2Mr1FlowConnector = "cascading.flow.hadoop2.Hadoop2MR1FlowConnector" + val DefaultHadoop2Mr1FlowProcess = "cascading.flow.hadoop.HadoopFlowProcess" // no Hadoop2MR1FlowProcess as of Cascading 3.0.0-wip-75? + + val DefaultHadoop2TezFlowConnector = "cascading.flow.tez.Hadoop2TezFlowConnector" + val DefaultHadoop2TezFlowProcess = "cascading.flow.tez.Hadoop2TezFlowProcess" + // This should be passed ALL the args supplied after the job name def apply(args: Args, config: Configuration): Mode = { val strictSources = args.boolean("tool.partialok") == false @@ -73,10 +85,22 @@ object Mode { if (args.boolean("local")) Local(strictSources) - else if (args.boolean("hdfs")) + else if (args.boolean("hdfs")) /* FIXME: should we start printing deprecation warnings ? It's okay to set manually c.f.*.class though */ + Hdfs(strictSources, config) + else if (args.boolean("hadoop1")) { + config.set(CascadingFlowConnectorClassKey, DefaultHadoopFlowConnector) + config.set(CascadingFlowProcessClassKey, DefaultHadoopFlowProcess) + Hdfs(strictSources, config) + } else if (args.boolean("hadoop2-mr1")) { + config.set(CascadingFlowConnectorClassKey, DefaultHadoop2Mr1FlowConnector) + config.set(CascadingFlowProcessClassKey, DefaultHadoop2Mr1FlowProcess) + Hdfs(strictSources, config) + } else if (args.boolean("hadoop2-tez")) { + config.set(CascadingFlowConnectorClassKey, DefaultHadoop2TezFlowConnector) + config.set(CascadingFlowProcessClassKey, DefaultHadoop2TezFlowProcess) Hdfs(strictSources, config) - else - throw ArgsException("[ERROR] Mode must be one of --local or --hdfs, you provided neither") + } else + throw ArgsException("[ERROR] Mode must be one of --local, --hadoop1, --hadoop2-mr1, --hadoop2-tez or --hdfs, you provided none") } } @@ -116,7 +140,18 @@ trait HadoopMode extends Mode { asMap - jarKey case None => asMap } - new HadoopFlowConnector(finalMap.asJava) + + val flowConnectorClass = jobConf.get(Mode.CascadingFlowConnectorClassKey, Mode.DefaultHadoopFlowConnector) + + try { + val clazz = Class.forName(flowConnectorClass) + val ctor = clazz.getConstructor(classOf[java.util.Map[_, _]]) + ctor.newInstance(finalMap.asJava).asInstanceOf[FlowConnector] + } catch { + case ncd: ClassNotFoundException => { + throw new ModeLoadException("Failed to load Cascading flow connector class " + flowConnectorClass, ncd) + } + } } // TODO unlike newFlowConnector, this does not look at the Job.config @@ -125,7 +160,19 @@ trait HadoopMode extends Mode { val conf = new JobConf(true) // initialize the default config // copy over Config config.toMap.foreach{ case (k, v) => conf.set(k, v) } - val fp = new HadoopFlowProcess(conf) + + val flowProcessClass = jobConf.get(Mode.CascadingFlowProcessClassKey, Mode.DefaultHadoopFlowProcess) + + val fp = try { + val clazz = Class.forName(flowProcessClass) + val ctor = clazz.getConstructor(classOf[JobConf]) + ctor.newInstance(conf).asInstanceOf[FlowProcess[JobConf]] + } catch { + case ncd: ClassNotFoundException => { + throw new ModeLoadException("Failed to load Cascading flow process class " + flowProcessClass, ncd) + } + } + htap.retrieveSourceFields(fp) htap.sourceConfInit(fp, conf) htap.openForRead(fp) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/XHandler.scala b/scalding-core/src/main/scala/com/twitter/scalding/XHandler.scala index 8c17a7b10b..7b1fb7b519 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/XHandler.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/XHandler.scala @@ -27,11 +27,15 @@ object RichXHandler { val BinaryProblem = "GUESS: This may be a problem with the binary version of a dependency. " + "Check which versions of dependencies you're pulling in." + val RequiredCascadingFabricNotInClassPath = "GUESS: Required Cascading fabric is not supplied in the classpath." + + "Check which versions and variants of dependencies you're pulling in." + val DataIsMissing = "GUESS: Data is missing from the path you provided." val RequireSinks = "GUESS: Cascading requires all sources to have final sinks on disk." val mapping: Map[Class[_ <: Throwable], String] = Map( + classOf[ModeLoadException] -> RequiredCascadingFabricNotInClassPath, classOf[NoClassDefFoundError] -> BinaryProblem, classOf[AbstractMethodError] -> BinaryProblem, classOf[NoSuchMethodError] -> BinaryProblem, @@ -47,14 +51,22 @@ object RichXHandler { case cause => rootOf(cause) } + @annotation.tailrec + final def peelUntilMappable(t: Throwable): Class[_ <: Throwable] = + (mapping.get(t.getClass), t.getCause) match { + case (Some(diag), _) => t.getClass // we're going to find a mappable cause. + case (None, null) => t.getClass // we're at the root. There won't be any cause + case (None, cause) => peelUntilMappable(cause) + } + def createXUrl(t: Throwable): String = - gitHubUrl + (rootOf(t).getClass.getName.replace(".", "").toLowerCase) + gitHubUrl + (peelUntilMappable(t).getName.replace(".", "").toLowerCase) def apply(xMap: Map[Class[_ <: Throwable], String] = mapping, dVal: String = Default) = new XHandler(xMap, dVal) def apply(t: Throwable): String = - mapping.get(rootOf(t).getClass) + mapping.get(peelUntilMappable(t)) .map(_ + "\n") .getOrElse("") + "If you know what exactly caused this error, please consider contributing to GitHub via following link.\n" + diff --git a/scalding-core/src/test/scala/com/twitter/scalding/XHandlerTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/XHandlerTest.scala index ae962cf668..dd93ef5193 100644 --- a/scalding-core/src/test/scala/com/twitter/scalding/XHandlerTest.scala +++ b/scalding-core/src/test/scala/com/twitter/scalding/XHandlerTest.scala @@ -28,6 +28,7 @@ class XHandlerTest extends WordSpec with Matchers { rxh.handlers.find(h => h(new NoSuchMethodError)) should not be empty rxh.handlers.find(h => h(new AbstractMethodError)) should not be empty rxh.handlers.find(h => h(new NoClassDefFoundError)) should not be empty + rxh.handlers.find(h => h(new ModeLoadException("dummy", new ClassNotFoundException))) should not be empty } "be handled if exist in custom mapping" in { val cRxh = RichXHandler(RichXHandler.mapping ++ Map(classOf[NullPointerException] -> "NPE")) @@ -41,6 +42,7 @@ class XHandlerTest extends WordSpec with Matchers { } "be valid keys in mapping if defined" in { val rxh = RichXHandler() + rxh.mapping(classOf[ModeLoadException]) shouldBe RichXHandler.RequiredCascadingFabricNotInClassPath rxh.mapping(classOf[PlannerException]) shouldBe RichXHandler.RequireSinks rxh.mapping(classOf[InvalidSourceException]) shouldBe RichXHandler.DataIsMissing rxh.mapping(classOf[NoSuchMethodError]) shouldBe RichXHandler.BinaryProblem @@ -54,11 +56,13 @@ class XHandlerTest extends WordSpec with Matchers { val NoSuchMethodErrorString = "javalangnosuchmethoderror" val InvalidSouceExceptionString = "comtwitterscaldinginvalidsourceexception" val PlannerExceptionString = "cascadingflowplannerplannerexception" + val ModeLoadExceptionString = "comtwitterscaldingmodeloadexception" RichXHandler.createXUrl(new PlannerException) shouldBe (RichXHandler.gitHubUrl + PlannerExceptionString) RichXHandler.createXUrl(new InvalidSourceException("Invalid Source")) shouldBe (RichXHandler.gitHubUrl + InvalidSouceExceptionString) RichXHandler.createXUrl(new NoSuchMethodError) shouldBe (RichXHandler.gitHubUrl + NoSuchMethodErrorString) RichXHandler.createXUrl(new AbstractMethodError) shouldBe (RichXHandler.gitHubUrl + AbstractMethodErrorString) RichXHandler.createXUrl(new NoClassDefFoundError) shouldBe (RichXHandler.gitHubUrl + NoClassDefFoundErrorString) + RichXHandler.createXUrl(ModeLoadException("dummy", new ClassNotFoundException)) shouldBe (RichXHandler.gitHubUrl + ModeLoadExceptionString) } } }