diff --git a/conf/livy.conf.template b/conf/livy.conf.template index 0299dca7f..30a3a1f40 100644 --- a/conf/livy.conf.template +++ b/conf/livy.conf.template @@ -47,6 +47,10 @@ # What spark deploy mode Livy sessions should use. # livy.spark.deploy-mode = +# What default YARN queue Livy sessions should use if not specified in the client request. +# By default, it is null and falls back to the global Hadoop cluster default queue. +# livy.spark.yarn.queue = default + # Configure Livy server http request and response header size. # livy.server.request-header.size = 131072 # livy.server.response-header.size = 131072 diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala index 006ab5dff..8dbfdd023 100644 --- a/server/src/main/scala/org/apache/livy/LivyConf.scala +++ b/server/src/main/scala/org/apache/livy/LivyConf.scala @@ -41,6 +41,7 @@ object LivyConf { val TEST_MODE = ClientConf.TEST_MODE + val SPARK_YARN_QUEUE = Entry("livy.spark.yarn.queue", null) val SPARK_HOME = Entry("livy.server.spark-home", null) val LIVY_SPARK_MASTER = Entry("livy.spark.master", "local") val LIVY_SPARK_DEPLOY_MODE = Entry("livy.spark.deploy-mode", null) @@ -485,6 +486,9 @@ class LivyConf(loadDefaults: Boolean) extends ClientConf[LivyConf](null) { /** Return the spark master Livy sessions should use. */ def sparkMaster(): String = get(LIVY_SPARK_MASTER) + /** Return the value of spark yarn queue. */ + def getYarnQueue(): Option[String] = Option(get(SPARK_YARN_QUEUE)) + /** Return the path to the spark-submit executable. */ def sparkSubmit(): String = { sparkHome().map { _ + File.separator + "bin" + File.separator + "spark-submit" }.get diff --git a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala index 8b64a0398..0019a5810 100644 --- a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala +++ b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala @@ -83,7 +83,7 @@ object BatchSession extends Logging { request.executorMemory.foreach(builder.executorMemory) request.executorCores.foreach(builder.executorCores) request.numExecutors.foreach(builder.numExecutors) - request.queue.foreach(builder.queue) + request.queue.orElse(livyConf.getYarnQueue()).foreach(builder.queue) request.name.foreach(builder.name) sessionStore.save(BatchSession.RECOVERY_SESSION_TYPE, s.recoveryMetadata) @@ -120,6 +120,7 @@ object BatchSession extends Logging { owner, impersonatedUser, sessionStore, + request.queue.orElse(livyConf.getYarnQueue()), mockApp.map { m => (_: BatchSession) => m }.getOrElse(createSparkApp)) } @@ -137,6 +138,7 @@ object BatchSession extends Logging { m.owner, m.proxyUser, sessionStore, + None, mockApp.map { m => (_: BatchSession) => m }.getOrElse { s => SparkApp.create(m.appTag, m.appId, None, livyConf, Option(s)) }) @@ -152,6 +154,7 @@ class BatchSession( owner: String, override val proxyUser: Option[String], sessionStore: SessionStore, + val queue: Option[String], sparkApp: BatchSession => SparkApp) extends Session(id, name, owner, livyConf) with SparkAppListener { import BatchSession._ diff --git a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala index 0667b718c..c6b62309d 100644 --- a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala +++ b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala @@ -107,7 +107,7 @@ object InteractiveSession extends Logging { SparkLauncher.EXECUTOR_MEMORY -> request.executorMemory.map(_.toString), "spark.executor.instances" -> request.numExecutors.map(_.toString), "spark.app.name" -> request.name.map(_.toString), - "spark.yarn.queue" -> request.queue + "spark.yarn.queue" -> request.queue.orElse(livyConf.getYarnQueue()) ) userOpts.foreach { case (key, opt) => @@ -152,7 +152,7 @@ object InteractiveSession extends Logging { request.jars, request.numExecutors, request.pyFiles, - request.queue, + request.queue.orElse(livyConf.getYarnQueue()), mockApp) } diff --git a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala index 401a8beb1..d22dea256 100644 --- a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala @@ -138,6 +138,65 @@ class BatchSessionSpec }) should be (true) } + it("should inherit the default YARN queue from LivyConf when request queue is empty") { + val req = new CreateBatchRequest() + req.file = script.toString + req.queue = None // Explicitly empty + req.conf = Map("spark.driver.extraClassPath" -> sys.props("java.class.path")) + + // Set default queue in LivyConf configuration + val conf = new LivyConf() + .set(LivyConf.LOCAL_FS_WHITELIST, sys.props("java.io.tmpdir")) + .set(LivyConf.SPARK_YARN_QUEUE, "livy-default-batch-queue") + + val accessManager = new AccessManager(conf) + val mockApp = mock[SparkApp] + + val batch = BatchSession.create( + id = 10, + name = None, + request = req, + livyConf = conf, + accessManager = accessManager, + owner = null, + proxyUser = None, + sessionStore = sessionStore, + mockApp = Some(mockApp) + ) + + // Verify that the batch session structure captured the fallback queue configuration + batch.queue shouldBe Some("livy-default-batch-queue") + } + + it("should prioritize user-specified request queue over LivyConf configuration") { + val req = new CreateBatchRequest() + req.file = script.toString + req.queue = Some("user-custom-batch-queue") // Explicitly requested by user + req.conf = Map("spark.driver.extraClassPath" -> sys.props("java.class.path")) + + val conf = new LivyConf() + .set(LivyConf.LOCAL_FS_WHITELIST, sys.props("java.io.tmpdir")) + .set(LivyConf.SPARK_YARN_QUEUE, "livy-default-batch-queue") + + val accessManager = new AccessManager(conf) + val mockApp = mock[SparkApp] + + val batch = BatchSession.create( + id = 20, + name = None, + request = req, + livyConf = conf, + accessManager = accessManager, + owner = null, + proxyUser = None, + sessionStore = sessionStore, + mockApp = Some(mockApp) + ) + + // Verify user context takes absolute priority over fallback definition + batch.queue shouldBe Some("user-custom-batch-queue") + } + def testRecoverSession(name: Option[String]): Unit = { val conf = new LivyConf() val req = new CreateBatchRequest() diff --git a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala index e7d651f89..d443dea6b 100644 --- a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala @@ -322,4 +322,71 @@ class InteractiveSessionSpec extends FunSpec s.logLines().mkString should include("RSCDriver URI is unknown") } } + + describe("InteractiveSession") { + it("should inherit the default YARN queue from LivyConf when request queue is empty") { + // Create a clean instance of LivyConf for this isolated check + val testLivyConf = new LivyConf() + .set(LivyConf.REPL_JARS, "dummy.jar") + .set(LivyConf.SPARK_YARN_QUEUE, "livy-default-queue") + + val req = new CreateInteractiveRequest() + req.kind = Spark + req.queue = None // Explicitly empty + req.conf = Map(RSCConf.Entry.LIVY_JARS.key() -> "") + + // Create a mock RSCClient to avoid spawning a real background spark-submit process + val mockClient = Some(mock[RSCClient]) + + val s = InteractiveSession.create( + id = 101, + name = None, + owner = "systest", + proxyUser = None, + livyConf = testLivyConf, + accessManager = accessManager, + request = req, + sessionStore = mock[SessionStore], + ttl = None, + idleTimeout = None, + mockApp = None, + mockClient = mockClient + ) + + // Verify that the internal session state correctly holds the fallback queue string + s.queue shouldBe Some("livy-default-queue") + } + + it("should prioritize user-specified request queue over LivyConf global configuration") { + val testLivyConf = new LivyConf() + .set(LivyConf.REPL_JARS, "dummy.jar") + .set(LivyConf.SPARK_YARN_QUEUE, "livy-default-queue") + + val req = new CreateInteractiveRequest() + req.kind = Spark + req.queue = Some("user-custom-queue") // Explicitly provided by request + req.conf = Map(RSCConf.Entry.LIVY_JARS.key() -> "") + + val mockClient = Some(mock[RSCClient]) + + val s = InteractiveSession.create( + id = 102, + name = None, + owner = "systest", + proxyUser = None, + livyConf = testLivyConf, + accessManager = accessManager, + request = req, + sessionStore = mock[SessionStore], + ttl = None, + idleTimeout = None, + mockApp = None, + mockClient = mockClient + ) + + // Verify that user context takes absolute priority over fallback definitions + s.queue shouldBe Some("user-custom-queue") + } + + } }