Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 55 additions & 8 deletions scalding-core/src/main/scala/com/twitter/scalding/Mode.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{ FileSystem, Path }
import org.apache.hadoop.mapred.JobConf

import cascading.flow.FlowConnector
import cascading.flow.hadoop.HadoopFlowProcess
import cascading.flow.hadoop.HadoopFlowConnector
import cascading.flow.{ FlowProcess, FlowConnector, FlowDef, Flow }
import cascading.flow.local.LocalFlowConnector
import cascading.flow.local.LocalFlowProcess
import cascading.property.AppProps
Expand All @@ -43,6 +41,8 @@ import org.slf4j.LoggerFactory

case class ModeException(message: String) extends RuntimeException(message)

case class ModeLoadException(message: String, origin: ClassNotFoundException) extends RuntimeException(origin)

object Mode {
/**
* This is a Args and a Mode together. It is used purely as
Expand All @@ -63,6 +63,18 @@ object Mode {
case _ => None
}

val CascadingFlowConnectorClassKey = "cascading.flow.connector.class"
val CascadingFlowProcessClassKey = "cascading.flow.process.class"

val DefaultHadoopFlowConnector = "cascading.flow.hadoop.HadoopFlowConnector"
val DefaultHadoopFlowProcess = "cascading.flow.hadoop.HadoopFlowProcess"

val DefaultHadoop2Mr1FlowConnector = "cascading.flow.hadoop2.Hadoop2MR1FlowConnector"
val DefaultHadoop2Mr1FlowProcess = "cascading.flow.hadoop.HadoopFlowProcess" // no Hadoop2MR1FlowProcess as of Cascading 3.0.0-wip-75?

val DefaultHadoop2TezFlowConnector = "cascading.flow.tez.Hadoop2TezFlowConnector"
val DefaultHadoop2TezFlowProcess = "cascading.flow.tez.Hadoop2TezFlowProcess"

// This should be passed ALL the args supplied after the job name
def apply(args: Args, config: Configuration): Mode = {
val strictSources = args.boolean("tool.partialok") == false
Expand All @@ -73,10 +85,22 @@ object Mode {

if (args.boolean("local"))
Local(strictSources)
else if (args.boolean("hdfs"))
else if (args.boolean("hdfs")) /* FIXME: should we start printing deprecation warnings ? It's okay to set manually c.f.*.class though */
Hdfs(strictSources, config)
else if (args.boolean("hadoop1")) {
config.set(CascadingFlowConnectorClassKey, DefaultHadoopFlowConnector)
config.set(CascadingFlowProcessClassKey, DefaultHadoopFlowProcess)
Hdfs(strictSources, config)
} else if (args.boolean("hadoop2-mr1")) {
config.set(CascadingFlowConnectorClassKey, DefaultHadoop2Mr1FlowConnector)
config.set(CascadingFlowProcessClassKey, DefaultHadoop2Mr1FlowProcess)
Hdfs(strictSources, config)
} else if (args.boolean("hadoop2-tez")) {
config.set(CascadingFlowConnectorClassKey, DefaultHadoop2TezFlowConnector)
config.set(CascadingFlowProcessClassKey, DefaultHadoop2TezFlowProcess)
Hdfs(strictSources, config)
else
throw ArgsException("[ERROR] Mode must be one of --local or --hdfs, you provided neither")
} else
throw ArgsException("[ERROR] Mode must be one of --local, --hadoop1, --hadoop2-mr1, --hadoop2-tez or --hdfs, you provided none")
}
}

Expand Down Expand Up @@ -116,7 +140,18 @@ trait HadoopMode extends Mode {
asMap - jarKey
case None => asMap
}
new HadoopFlowConnector(finalMap.asJava)

val flowConnectorClass = jobConf.get(Mode.CascadingFlowConnectorClassKey, Mode.DefaultHadoopFlowConnector)

try {
val clazz = Class.forName(flowConnectorClass)
val ctor = clazz.getConstructor(classOf[java.util.Map[_, _]])
ctor.newInstance(finalMap.asJava).asInstanceOf[FlowConnector]
} catch {
case ncd: ClassNotFoundException => {
throw new ModeLoadException("Failed to load Cascading flow connector class " + flowConnectorClass, ncd)
}
}
}

// TODO unlike newFlowConnector, this does not look at the Job.config
Expand All @@ -125,7 +160,19 @@ trait HadoopMode extends Mode {
val conf = new JobConf(true) // initialize the default config
// copy over Config
config.toMap.foreach{ case (k, v) => conf.set(k, v) }
val fp = new HadoopFlowProcess(conf)

val flowProcessClass = jobConf.get(Mode.CascadingFlowProcessClassKey, Mode.DefaultHadoopFlowProcess)

val fp = try {
val clazz = Class.forName(flowProcessClass)
val ctor = clazz.getConstructor(classOf[JobConf])
ctor.newInstance(conf).asInstanceOf[FlowProcess[JobConf]]
} catch {
case ncd: ClassNotFoundException => {
throw new ModeLoadException("Failed to load Cascading flow process class " + flowProcessClass, ncd)
}
}

htap.retrieveSourceFields(fp)
htap.sourceConfInit(fp, conf)
htap.openForRead(fp)
Expand Down
16 changes: 14 additions & 2 deletions scalding-core/src/main/scala/com/twitter/scalding/XHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,15 @@ object RichXHandler {
val BinaryProblem = "GUESS: This may be a problem with the binary version of a dependency. " +
"Check which versions of dependencies you're pulling in."

val RequiredCascadingFabricNotInClassPath = "GUESS: Required Cascading fabric is not supplied in the classpath." +
"Check which versions and variants of dependencies you're pulling in."

val DataIsMissing = "GUESS: Data is missing from the path you provided."

val RequireSinks = "GUESS: Cascading requires all sources to have final sinks on disk."

val mapping: Map[Class[_ <: Throwable], String] = Map(
classOf[ModeLoadException] -> RequiredCascadingFabricNotInClassPath,
classOf[NoClassDefFoundError] -> BinaryProblem,
classOf[AbstractMethodError] -> BinaryProblem,
classOf[NoSuchMethodError] -> BinaryProblem,
Expand All @@ -47,14 +51,22 @@ object RichXHandler {
case cause => rootOf(cause)
}

@annotation.tailrec
final def peelUntilMappable(t: Throwable): Class[_ <: Throwable] =
(mapping.get(t.getClass), t.getCause) match {
case (Some(diag), _) => t.getClass // we're going to find a mappable cause.
case (None, null) => t.getClass // we're at the root. There won't be any cause
case (None, cause) => peelUntilMappable(cause)
}

def createXUrl(t: Throwable): String =
gitHubUrl + (rootOf(t).getClass.getName.replace(".", "").toLowerCase)
gitHubUrl + (peelUntilMappable(t).getName.replace(".", "").toLowerCase)

def apply(xMap: Map[Class[_ <: Throwable], String] = mapping, dVal: String = Default) =
new XHandler(xMap, dVal)

def apply(t: Throwable): String =
mapping.get(rootOf(t).getClass)
mapping.get(peelUntilMappable(t))
.map(_ + "\n")
.getOrElse("") +
"If you know what exactly caused this error, please consider contributing to GitHub via following link.\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class XHandlerTest extends WordSpec with Matchers {
rxh.handlers.find(h => h(new NoSuchMethodError)) should not be empty
rxh.handlers.find(h => h(new AbstractMethodError)) should not be empty
rxh.handlers.find(h => h(new NoClassDefFoundError)) should not be empty
rxh.handlers.find(h => h(new ModeLoadException("dummy", new ClassNotFoundException))) should not be empty
}
"be handled if exist in custom mapping" in {
val cRxh = RichXHandler(RichXHandler.mapping ++ Map(classOf[NullPointerException] -> "NPE"))
Expand All @@ -41,6 +42,7 @@ class XHandlerTest extends WordSpec with Matchers {
}
"be valid keys in mapping if defined" in {
val rxh = RichXHandler()
rxh.mapping(classOf[ModeLoadException]) shouldBe RichXHandler.RequiredCascadingFabricNotInClassPath
rxh.mapping(classOf[PlannerException]) shouldBe RichXHandler.RequireSinks
rxh.mapping(classOf[InvalidSourceException]) shouldBe RichXHandler.DataIsMissing
rxh.mapping(classOf[NoSuchMethodError]) shouldBe RichXHandler.BinaryProblem
Expand All @@ -54,11 +56,13 @@ class XHandlerTest extends WordSpec with Matchers {
val NoSuchMethodErrorString = "javalangnosuchmethoderror"
val InvalidSouceExceptionString = "comtwitterscaldinginvalidsourceexception"
val PlannerExceptionString = "cascadingflowplannerplannerexception"
val ModeLoadExceptionString = "comtwitterscaldingmodeloadexception"
RichXHandler.createXUrl(new PlannerException) shouldBe (RichXHandler.gitHubUrl + PlannerExceptionString)
RichXHandler.createXUrl(new InvalidSourceException("Invalid Source")) shouldBe (RichXHandler.gitHubUrl + InvalidSouceExceptionString)
RichXHandler.createXUrl(new NoSuchMethodError) shouldBe (RichXHandler.gitHubUrl + NoSuchMethodErrorString)
RichXHandler.createXUrl(new AbstractMethodError) shouldBe (RichXHandler.gitHubUrl + AbstractMethodErrorString)
RichXHandler.createXUrl(new NoClassDefFoundError) shouldBe (RichXHandler.gitHubUrl + NoClassDefFoundErrorString)
RichXHandler.createXUrl(ModeLoadException("dummy", new ClassNotFoundException)) shouldBe (RichXHandler.gitHubUrl + ModeLoadExceptionString)
}
}
}