Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions conf/livy.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@
# What spark deploy mode Livy sessions should use.
# livy.spark.deploy-mode =

# What default YARN queue Livy sessions should use if not specified in the client request.
# By default, it is null and falls back to the global Hadoop cluster default queue.
# livy.spark.yarn.queue = default

# Configure Livy server http request and response header size.
# livy.server.request-header.size = 131072
# livy.server.response-header.size = 131072
Expand Down
4 changes: 4 additions & 0 deletions server/src/main/scala/org/apache/livy/LivyConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ object LivyConf {

val TEST_MODE = ClientConf.TEST_MODE

val SPARK_YARN_QUEUE = Entry("livy.spark.yarn.queue", null)
Comment thread
nileshrathi345 marked this conversation as resolved.
val SPARK_HOME = Entry("livy.server.spark-home", null)
val LIVY_SPARK_MASTER = Entry("livy.spark.master", "local")
val LIVY_SPARK_DEPLOY_MODE = Entry("livy.spark.deploy-mode", null)
Expand Down Expand Up @@ -485,6 +486,9 @@ class LivyConf(loadDefaults: Boolean) extends ClientConf[LivyConf](null) {
/** Return the spark master Livy sessions should use. */
def sparkMaster(): String = get(LIVY_SPARK_MASTER)

/** Return the value of spark yarn queue. */
def getYarnQueue(): Option[String] = Option(get(SPARK_YARN_QUEUE))

/** Return the path to the spark-submit executable. */
def sparkSubmit(): String = {
sparkHome().map { _ + File.separator + "bin" + File.separator + "spark-submit" }.get
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ object BatchSession extends Logging {
request.executorMemory.foreach(builder.executorMemory)
request.executorCores.foreach(builder.executorCores)
request.numExecutors.foreach(builder.numExecutors)
request.queue.foreach(builder.queue)
request.queue.orElse(livyConf.getYarnQueue()).foreach(builder.queue)
request.name.foreach(builder.name)

sessionStore.save(BatchSession.RECOVERY_SESSION_TYPE, s.recoveryMetadata)
Expand Down Expand Up @@ -120,6 +120,7 @@ object BatchSession extends Logging {
owner,
impersonatedUser,
sessionStore,
request.queue.orElse(livyConf.getYarnQueue()),
mockApp.map { m => (_: BatchSession) => m }.getOrElse(createSparkApp))
}

Expand All @@ -137,6 +138,7 @@ object BatchSession extends Logging {
m.owner,
m.proxyUser,
sessionStore,
None,
mockApp.map { m => (_: BatchSession) => m }.getOrElse { s =>
SparkApp.create(m.appTag, m.appId, None, livyConf, Option(s))
})
Expand All @@ -152,6 +154,7 @@ class BatchSession(
owner: String,
override val proxyUser: Option[String],
sessionStore: SessionStore,
val queue: Option[String],
sparkApp: BatchSession => SparkApp)
extends Session(id, name, owner, livyConf) with SparkAppListener {
import BatchSession._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ object InteractiveSession extends Logging {
SparkLauncher.EXECUTOR_MEMORY -> request.executorMemory.map(_.toString),
"spark.executor.instances" -> request.numExecutors.map(_.toString),
"spark.app.name" -> request.name.map(_.toString),
"spark.yarn.queue" -> request.queue
"spark.yarn.queue" -> request.queue.orElse(livyConf.getYarnQueue())
)

userOpts.foreach { case (key, opt) =>
Expand Down Expand Up @@ -152,7 +152,7 @@ object InteractiveSession extends Logging {
request.jars,
request.numExecutors,
request.pyFiles,
request.queue,
request.queue.orElse(livyConf.getYarnQueue()),
mockApp)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,65 @@ class BatchSessionSpec
}) should be (true)
}

it("should inherit the default YARN queue from LivyConf when request queue is empty") {
val req = new CreateBatchRequest()
req.file = script.toString
req.queue = None // Explicitly empty
req.conf = Map("spark.driver.extraClassPath" -> sys.props("java.class.path"))

// Set default queue in LivyConf configuration
val conf = new LivyConf()
.set(LivyConf.LOCAL_FS_WHITELIST, sys.props("java.io.tmpdir"))
.set(LivyConf.SPARK_YARN_QUEUE, "livy-default-batch-queue")

val accessManager = new AccessManager(conf)
val mockApp = mock[SparkApp]

val batch = BatchSession.create(
id = 10,
name = None,
request = req,
livyConf = conf,
accessManager = accessManager,
owner = null,
proxyUser = None,
sessionStore = sessionStore,
mockApp = Some(mockApp)
)

// Verify that the batch session structure captured the fallback queue configuration
batch.queue shouldBe Some("livy-default-batch-queue")
}

it("should prioritize user-specified request queue over LivyConf configuration") {
val req = new CreateBatchRequest()
req.file = script.toString
req.queue = Some("user-custom-batch-queue") // Explicitly requested by user
req.conf = Map("spark.driver.extraClassPath" -> sys.props("java.class.path"))

val conf = new LivyConf()
.set(LivyConf.LOCAL_FS_WHITELIST, sys.props("java.io.tmpdir"))
.set(LivyConf.SPARK_YARN_QUEUE, "livy-default-batch-queue")

val accessManager = new AccessManager(conf)
val mockApp = mock[SparkApp]

val batch = BatchSession.create(
id = 20,
name = None,
request = req,
livyConf = conf,
accessManager = accessManager,
owner = null,
proxyUser = None,
sessionStore = sessionStore,
mockApp = Some(mockApp)
)

// Verify user context takes absolute priority over fallback definition
batch.queue shouldBe Some("user-custom-batch-queue")
}

def testRecoverSession(name: Option[String]): Unit = {
val conf = new LivyConf()
val req = new CreateBatchRequest()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,4 +322,71 @@ class InteractiveSessionSpec extends FunSpec
s.logLines().mkString should include("RSCDriver URI is unknown")
}
}

describe("InteractiveSession") {
it("should inherit the default YARN queue from LivyConf when request queue is empty") {
// Create a clean instance of LivyConf for this isolated check
val testLivyConf = new LivyConf()
.set(LivyConf.REPL_JARS, "dummy.jar")
.set(LivyConf.SPARK_YARN_QUEUE, "livy-default-queue")

val req = new CreateInteractiveRequest()
req.kind = Spark
req.queue = None // Explicitly empty
req.conf = Map(RSCConf.Entry.LIVY_JARS.key() -> "")

// Create a mock RSCClient to avoid spawning a real background spark-submit process
val mockClient = Some(mock[RSCClient])

val s = InteractiveSession.create(
id = 101,
name = None,
owner = "systest",
proxyUser = None,
livyConf = testLivyConf,
accessManager = accessManager,
request = req,
sessionStore = mock[SessionStore],
ttl = None,
idleTimeout = None,
mockApp = None,
mockClient = mockClient
)

// Verify that the internal session state correctly holds the fallback queue string
s.queue shouldBe Some("livy-default-queue")
}

it("should prioritize user-specified request queue over LivyConf global configuration") {
val testLivyConf = new LivyConf()
.set(LivyConf.REPL_JARS, "dummy.jar")
.set(LivyConf.SPARK_YARN_QUEUE, "livy-default-queue")

val req = new CreateInteractiveRequest()
req.kind = Spark
req.queue = Some("user-custom-queue") // Explicitly provided by request
req.conf = Map(RSCConf.Entry.LIVY_JARS.key() -> "")

val mockClient = Some(mock[RSCClient])

val s = InteractiveSession.create(
id = 102,
name = None,
owner = "systest",
proxyUser = None,
livyConf = testLivyConf,
accessManager = accessManager,
request = req,
sessionStore = mock[SessionStore],
ttl = None,
idleTimeout = None,
mockApp = None,
mockClient = mockClient
)

// Verify that user context takes absolute priority over fallback definitions
s.queue shouldBe Some("user-custom-queue")
}

}
}
Loading