From 25ab8f5e88b307c4346c80902e6678df30a240fd Mon Sep 17 00:00:00 2001 From: "nilesh.rathi" Date: Tue, 23 Jun 2026 15:54:27 +0530 Subject: [PATCH 1/4] [LIVY-1059] Add livy.spark.yarn.queue configuration to set default YARN queue --- .../main/scala/org/apache/livy/LivyConf.scala | 4 + .../livy/server/batch/BatchSession.scala | 8 +- .../server/batch/BatchSessionServlet.scala | 3 +- .../interactive/InteractiveSession.scala | 4 +- .../livy/server/batch/BatchSessionSpec.scala | 77 +++++++++++++++++++ .../interactive/InteractiveSessionSpec.scala | 67 ++++++++++++++++ 6 files changed, 158 insertions(+), 5 deletions(-) diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala index 006ab5dff..8dbfdd023 100644 --- a/server/src/main/scala/org/apache/livy/LivyConf.scala +++ b/server/src/main/scala/org/apache/livy/LivyConf.scala @@ -41,6 +41,7 @@ object LivyConf { val TEST_MODE = ClientConf.TEST_MODE + val SPARK_YARN_QUEUE = Entry("livy.spark.yarn.queue", null) val SPARK_HOME = Entry("livy.server.spark-home", null) val LIVY_SPARK_MASTER = Entry("livy.spark.master", "local") val LIVY_SPARK_DEPLOY_MODE = Entry("livy.spark.deploy-mode", null) @@ -485,6 +486,9 @@ class LivyConf(loadDefaults: Boolean) extends ClientConf[LivyConf](null) { /** Return the spark master Livy sessions should use. */ def sparkMaster(): String = get(LIVY_SPARK_MASTER) + /** Return the value of spark yarn queue. */ + def getYarnQueue(): Option[String] = Option(get(SPARK_YARN_QUEUE)) + /** Return the path to the spark-submit executable. */ def sparkSubmit(): String = { sparkHome().map { _ + File.separator + "bin" + File.separator + "spark-submit" }.get diff --git a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala index 8b64a0398..05faba83a 100644 --- a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala +++ b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala @@ -40,6 +40,7 @@ case class BatchRecoveryMetadata( appTag: String, owner: String, proxyUser: Option[String], + queue: Option[String] = None, version: Int = 1) extends RecoveryMetadata @@ -83,7 +84,7 @@ object BatchSession extends Logging { request.executorMemory.foreach(builder.executorMemory) request.executorCores.foreach(builder.executorCores) request.numExecutors.foreach(builder.numExecutors) - request.queue.foreach(builder.queue) + request.queue.orElse(livyConf.getYarnQueue()).foreach(builder.queue) request.name.foreach(builder.name) sessionStore.save(BatchSession.RECOVERY_SESSION_TYPE, s.recoveryMetadata) @@ -120,6 +121,7 @@ object BatchSession extends Logging { owner, impersonatedUser, sessionStore, + request.queue.orElse(livyConf.getYarnQueue()), mockApp.map { m => (_: BatchSession) => m }.getOrElse(createSparkApp)) } @@ -137,6 +139,7 @@ object BatchSession extends Logging { m.owner, m.proxyUser, sessionStore, + m.queue, mockApp.map { m => (_: BatchSession) => m }.getOrElse { s => SparkApp.create(m.appTag, m.appId, None, livyConf, Option(s)) }) @@ -152,6 +155,7 @@ class BatchSession( owner: String, override val proxyUser: Option[String], sessionStore: SessionStore, + val queue: Option[String], sparkApp: BatchSession => SparkApp) extends Session(id, name, owner, livyConf) with SparkAppListener { import BatchSession._ @@ -204,5 +208,5 @@ class BatchSession( override def infoChanged(appInfo: AppInfo): Unit = { this.appInfo = appInfo } override def recoveryMetadata: RecoveryMetadata = - BatchRecoveryMetadata(id, name, appId, appTag, owner, proxyUser) + BatchRecoveryMetadata(id, name, appId, appTag, owner, proxyUser, queue) } diff --git a/server/src/main/scala/org/apache/livy/server/batch/BatchSessionServlet.scala b/server/src/main/scala/org/apache/livy/server/batch/BatchSessionServlet.scala index d14e649f0..7cd4901ab 100644 --- a/server/src/main/scala/org/apache/livy/server/batch/BatchSessionServlet.scala +++ b/server/src/main/scala/org/apache/livy/server/batch/BatchSessionServlet.scala @@ -33,6 +33,7 @@ case class BatchSessionView( state: String, appId: Option[String], appInfo: AppInfo, + queue: Option[String], log: Seq[String]) class BatchSessionServlet( @@ -76,7 +77,7 @@ class BatchSessionServlet( Nil } BatchSessionView(session.id, session.name, session.owner, session.proxyUser, - session.state.toString, session.appId, session.appInfo, logs) + session.state.toString, session.appId, session.appInfo, session.queue, logs) } } diff --git a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala index 0667b718c..c6b62309d 100644 --- a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala +++ b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala @@ -107,7 +107,7 @@ object InteractiveSession extends Logging { SparkLauncher.EXECUTOR_MEMORY -> request.executorMemory.map(_.toString), "spark.executor.instances" -> request.numExecutors.map(_.toString), "spark.app.name" -> request.name.map(_.toString), - "spark.yarn.queue" -> request.queue + "spark.yarn.queue" -> request.queue.orElse(livyConf.getYarnQueue()) ) userOpts.foreach { case (key, opt) => @@ -152,7 +152,7 @@ object InteractiveSession extends Logging { request.jars, request.numExecutors, request.pyFiles, - request.queue, + request.queue.orElse(livyConf.getYarnQueue()), mockApp) } diff --git a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala index 401a8beb1..7fe7c2192 100644 --- a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala @@ -138,6 +138,83 @@ class BatchSessionSpec }) should be (true) } + it("should inherit the default YARN queue from LivyConf when request queue is empty") { + val req = new CreateBatchRequest() + req.file = script.toString + req.queue = None // Explicitly empty + req.conf = Map("spark.driver.extraClassPath" -> sys.props("java.class.path")) + + // Set default queue in LivyConf configuration + val conf = new LivyConf() + .set(LivyConf.LOCAL_FS_WHITELIST, sys.props("java.io.tmpdir")) + .set(LivyConf.SPARK_YARN_QUEUE, "livy-default-batch-queue") + + val accessManager = new AccessManager(conf) + val mockApp = mock[SparkApp] + + val batch = BatchSession.create( + id = 10, + name = None, + request = req, + livyConf = conf, + accessManager = accessManager, + owner = null, + proxyUser = None, + sessionStore = sessionStore, + mockApp = Some(mockApp) + ) + + // Verify that the batch session structure captured the fallback queue configuration + batch.queue shouldBe Some("livy-default-batch-queue") + } + + it("should prioritize user-specified request queue over LivyConf configuration") { + val req = new CreateBatchRequest() + req.file = script.toString + req.queue = Some("user-custom-batch-queue") // Explicitly requested by user + req.conf = Map("spark.driver.extraClassPath" -> sys.props("java.class.path")) + + val conf = new LivyConf() + .set(LivyConf.LOCAL_FS_WHITELIST, sys.props("java.io.tmpdir")) + .set(LivyConf.SPARK_YARN_QUEUE, "livy-default-batch-queue") + + val accessManager = new AccessManager(conf) + val mockApp = mock[SparkApp] + + val batch = BatchSession.create( + id = 20, + name = None, + request = req, + livyConf = conf, + accessManager = accessManager, + owner = null, + proxyUser = None, + sessionStore = sessionStore, + mockApp = Some(mockApp) + ) + + // Verify user context takes absolute priority over fallback definition + batch.queue shouldBe Some("user-custom-batch-queue") + } + + it("should populate the default YARN queue configuration during session recovery") { + val conf = new LivyConf().set(LivyConf.SPARK_YARN_QUEUE, "livy-recovered-batch-queue") + val mockApp = mock[SparkApp] + val metadata = BatchRecoveryMetadata( + id = 99, + name = Some("Recovery Queue Test"), + appId = None, + appTag = "livy-batch-99-abc", + owner = "systest", + proxyUser = None + ) + + val batch = BatchSession.recover(metadata, conf, sessionStore, Some(mockApp)) + + // Verify that the recovered session has populated its queue with the configuration fallback + batch.queue shouldBe Some("livy-recovered-batch-queue") + } + def testRecoverSession(name: Option[String]): Unit = { val conf = new LivyConf() val req = new CreateBatchRequest() diff --git a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala index e7d651f89..d443dea6b 100644 --- a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala @@ -322,4 +322,71 @@ class InteractiveSessionSpec extends FunSpec s.logLines().mkString should include("RSCDriver URI is unknown") } } + + describe("InteractiveSession") { + it("should inherit the default YARN queue from LivyConf when request queue is empty") { + // Create a clean instance of LivyConf for this isolated check + val testLivyConf = new LivyConf() + .set(LivyConf.REPL_JARS, "dummy.jar") + .set(LivyConf.SPARK_YARN_QUEUE, "livy-default-queue") + + val req = new CreateInteractiveRequest() + req.kind = Spark + req.queue = None // Explicitly empty + req.conf = Map(RSCConf.Entry.LIVY_JARS.key() -> "") + + // Create a mock RSCClient to avoid spawning a real background spark-submit process + val mockClient = Some(mock[RSCClient]) + + val s = InteractiveSession.create( + id = 101, + name = None, + owner = "systest", + proxyUser = None, + livyConf = testLivyConf, + accessManager = accessManager, + request = req, + sessionStore = mock[SessionStore], + ttl = None, + idleTimeout = None, + mockApp = None, + mockClient = mockClient + ) + + // Verify that the internal session state correctly holds the fallback queue string + s.queue shouldBe Some("livy-default-queue") + } + + it("should prioritize user-specified request queue over LivyConf global configuration") { + val testLivyConf = new LivyConf() + .set(LivyConf.REPL_JARS, "dummy.jar") + .set(LivyConf.SPARK_YARN_QUEUE, "livy-default-queue") + + val req = new CreateInteractiveRequest() + req.kind = Spark + req.queue = Some("user-custom-queue") // Explicitly provided by request + req.conf = Map(RSCConf.Entry.LIVY_JARS.key() -> "") + + val mockClient = Some(mock[RSCClient]) + + val s = InteractiveSession.create( + id = 102, + name = None, + owner = "systest", + proxyUser = None, + livyConf = testLivyConf, + accessManager = accessManager, + request = req, + sessionStore = mock[SessionStore], + ttl = None, + idleTimeout = None, + mockApp = None, + mockClient = mockClient + ) + + // Verify that user context takes absolute priority over fallback definitions + s.queue shouldBe Some("user-custom-queue") + } + + } } From e142eef7b840b2d6359a56f78790ffea685b8efe Mon Sep 17 00:00:00 2001 From: "nilesh.rathi" Date: Wed, 24 Jun 2026 11:07:47 +0530 Subject: [PATCH 2/4] documented config to livy.conf.template and few fixes --- conf/livy.conf.template | 4 ++++ .../livy/server/batch/BatchSession.scala | 2 +- .../livy/server/batch/BatchSessionSpec.scala | 20 +------------------ .../livy/sessions/SessionManagerSpec.scala | 2 +- 4 files changed, 7 insertions(+), 21 deletions(-) diff --git a/conf/livy.conf.template b/conf/livy.conf.template index 0299dca7f..30a3a1f40 100644 --- a/conf/livy.conf.template +++ b/conf/livy.conf.template @@ -47,6 +47,10 @@ # What spark deploy mode Livy sessions should use. # livy.spark.deploy-mode = +# What default YARN queue Livy sessions should use if not specified in the client request. +# By default, it is null and falls back to the global Hadoop cluster default queue. +# livy.spark.yarn.queue = default + # Configure Livy server http request and response header size. # livy.server.request-header.size = 131072 # livy.server.response-header.size = 131072 diff --git a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala index 05faba83a..688fe93ef 100644 --- a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala +++ b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala @@ -40,7 +40,7 @@ case class BatchRecoveryMetadata( appTag: String, owner: String, proxyUser: Option[String], - queue: Option[String] = None, + queue: Option[String], version: Int = 1) extends RecoveryMetadata diff --git a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala index 7fe7c2192..5f5603cb0 100644 --- a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala @@ -197,30 +197,12 @@ class BatchSessionSpec batch.queue shouldBe Some("user-custom-batch-queue") } - it("should populate the default YARN queue configuration during session recovery") { - val conf = new LivyConf().set(LivyConf.SPARK_YARN_QUEUE, "livy-recovered-batch-queue") - val mockApp = mock[SparkApp] - val metadata = BatchRecoveryMetadata( - id = 99, - name = Some("Recovery Queue Test"), - appId = None, - appTag = "livy-batch-99-abc", - owner = "systest", - proxyUser = None - ) - - val batch = BatchSession.recover(metadata, conf, sessionStore, Some(mockApp)) - - // Verify that the recovered session has populated its queue with the configuration fallback - batch.queue shouldBe Some("livy-recovered-batch-queue") - } - def testRecoverSession(name: Option[String]): Unit = { val conf = new LivyConf() val req = new CreateBatchRequest() val name = Some("Test Batch Session") val mockApp = mock[SparkApp] - val m = BatchRecoveryMetadata(99, name, None, "appTag", null, None) + val m = BatchRecoveryMetadata(99, name, None, "appTag", null, None, None) val batch = BatchSession.recover(m, conf, sessionStore, Some(mockApp)) batch.state shouldBe (SessionState.Recovering) diff --git a/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala b/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala index 363b01f89..1672b9759 100644 --- a/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala +++ b/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala @@ -215,7 +215,7 @@ class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuit implicit def executor: ExecutionContext = ExecutionContext.global def makeMetadata(id: Int, appTag: String): BatchRecoveryMetadata = { - BatchRecoveryMetadata(id, Some(s"test-session-$id"), None, appTag, null, None) + BatchRecoveryMetadata(id, Some(s"test-session-$id"), None, appTag, null, None, None) } def mockSession(id: Int): BatchSession = { From ba7b01a2e9dafd136c45fbce22a3caadba1f8848 Mon Sep 17 00:00:00 2001 From: "nilesh.rathi" Date: Thu, 25 Jun 2026 11:38:20 +0530 Subject: [PATCH 3/4] Updated and fixed the reviews suggestions --- .../scala/org/apache/livy/server/batch/BatchSession.scala | 5 ++--- .../org/apache/livy/server/batch/BatchSessionServlet.scala | 3 +-- .../org/apache/livy/server/batch/BatchSessionSpec.scala | 2 +- .../scala/org/apache/livy/sessions/SessionManagerSpec.scala | 2 +- 4 files changed, 5 insertions(+), 7 deletions(-) diff --git a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala index 688fe93ef..0019a5810 100644 --- a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala +++ b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala @@ -40,7 +40,6 @@ case class BatchRecoveryMetadata( appTag: String, owner: String, proxyUser: Option[String], - queue: Option[String], version: Int = 1) extends RecoveryMetadata @@ -139,7 +138,7 @@ object BatchSession extends Logging { m.owner, m.proxyUser, sessionStore, - m.queue, + None, mockApp.map { m => (_: BatchSession) => m }.getOrElse { s => SparkApp.create(m.appTag, m.appId, None, livyConf, Option(s)) }) @@ -208,5 +207,5 @@ class BatchSession( override def infoChanged(appInfo: AppInfo): Unit = { this.appInfo = appInfo } override def recoveryMetadata: RecoveryMetadata = - BatchRecoveryMetadata(id, name, appId, appTag, owner, proxyUser, queue) + BatchRecoveryMetadata(id, name, appId, appTag, owner, proxyUser) } diff --git a/server/src/main/scala/org/apache/livy/server/batch/BatchSessionServlet.scala b/server/src/main/scala/org/apache/livy/server/batch/BatchSessionServlet.scala index 7cd4901ab..d14e649f0 100644 --- a/server/src/main/scala/org/apache/livy/server/batch/BatchSessionServlet.scala +++ b/server/src/main/scala/org/apache/livy/server/batch/BatchSessionServlet.scala @@ -33,7 +33,6 @@ case class BatchSessionView( state: String, appId: Option[String], appInfo: AppInfo, - queue: Option[String], log: Seq[String]) class BatchSessionServlet( @@ -77,7 +76,7 @@ class BatchSessionServlet( Nil } BatchSessionView(session.id, session.name, session.owner, session.proxyUser, - session.state.toString, session.appId, session.appInfo, session.queue, logs) + session.state.toString, session.appId, session.appInfo, logs) } } diff --git a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala index 5f5603cb0..d22dea256 100644 --- a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala @@ -202,7 +202,7 @@ class BatchSessionSpec val req = new CreateBatchRequest() val name = Some("Test Batch Session") val mockApp = mock[SparkApp] - val m = BatchRecoveryMetadata(99, name, None, "appTag", null, None, None) + val m = BatchRecoveryMetadata(99, name, None, "appTag", null, None) val batch = BatchSession.recover(m, conf, sessionStore, Some(mockApp)) batch.state shouldBe (SessionState.Recovering) diff --git a/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala b/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala index 1672b9759..363b01f89 100644 --- a/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala +++ b/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala @@ -215,7 +215,7 @@ class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuit implicit def executor: ExecutionContext = ExecutionContext.global def makeMetadata(id: Int, appTag: String): BatchRecoveryMetadata = { - BatchRecoveryMetadata(id, Some(s"test-session-$id"), None, appTag, null, None, None) + BatchRecoveryMetadata(id, Some(s"test-session-$id"), None, appTag, null, None) } def mockSession(id: Int): BatchSession = { From 833b84d5265f0103f5345f7bcb3a6b6799159a52 Mon Sep 17 00:00:00 2001 From: "nilesh.rathi" Date: Tue, 23 Jun 2026 15:54:27 +0530 Subject: [PATCH 4/4] [LIVY-1059] Add livy.spark.yarn.queue configuration to set default YARN queue --- conf/livy.conf.template | 4 ++ .../main/scala/org/apache/livy/LivyConf.scala | 4 ++ .../livy/server/batch/BatchSession.scala | 5 +- .../interactive/InteractiveSession.scala | 4 +- .../livy/server/batch/BatchSessionSpec.scala | 59 ++++++++++++++++ .../interactive/InteractiveSessionSpec.scala | 67 +++++++++++++++++++ 6 files changed, 140 insertions(+), 3 deletions(-) diff --git a/conf/livy.conf.template b/conf/livy.conf.template index 0299dca7f..30a3a1f40 100644 --- a/conf/livy.conf.template +++ b/conf/livy.conf.template @@ -47,6 +47,10 @@ # What spark deploy mode Livy sessions should use. # livy.spark.deploy-mode = +# What default YARN queue Livy sessions should use if not specified in the client request. +# By default, it is null and falls back to the global Hadoop cluster default queue. +# livy.spark.yarn.queue = default + # Configure Livy server http request and response header size. # livy.server.request-header.size = 131072 # livy.server.response-header.size = 131072 diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala index 006ab5dff..8dbfdd023 100644 --- a/server/src/main/scala/org/apache/livy/LivyConf.scala +++ b/server/src/main/scala/org/apache/livy/LivyConf.scala @@ -41,6 +41,7 @@ object LivyConf { val TEST_MODE = ClientConf.TEST_MODE + val SPARK_YARN_QUEUE = Entry("livy.spark.yarn.queue", null) val SPARK_HOME = Entry("livy.server.spark-home", null) val LIVY_SPARK_MASTER = Entry("livy.spark.master", "local") val LIVY_SPARK_DEPLOY_MODE = Entry("livy.spark.deploy-mode", null) @@ -485,6 +486,9 @@ class LivyConf(loadDefaults: Boolean) extends ClientConf[LivyConf](null) { /** Return the spark master Livy sessions should use. */ def sparkMaster(): String = get(LIVY_SPARK_MASTER) + /** Return the value of spark yarn queue. */ + def getYarnQueue(): Option[String] = Option(get(SPARK_YARN_QUEUE)) + /** Return the path to the spark-submit executable. */ def sparkSubmit(): String = { sparkHome().map { _ + File.separator + "bin" + File.separator + "spark-submit" }.get diff --git a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala index 8b64a0398..0019a5810 100644 --- a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala +++ b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala @@ -83,7 +83,7 @@ object BatchSession extends Logging { request.executorMemory.foreach(builder.executorMemory) request.executorCores.foreach(builder.executorCores) request.numExecutors.foreach(builder.numExecutors) - request.queue.foreach(builder.queue) + request.queue.orElse(livyConf.getYarnQueue()).foreach(builder.queue) request.name.foreach(builder.name) sessionStore.save(BatchSession.RECOVERY_SESSION_TYPE, s.recoveryMetadata) @@ -120,6 +120,7 @@ object BatchSession extends Logging { owner, impersonatedUser, sessionStore, + request.queue.orElse(livyConf.getYarnQueue()), mockApp.map { m => (_: BatchSession) => m }.getOrElse(createSparkApp)) } @@ -137,6 +138,7 @@ object BatchSession extends Logging { m.owner, m.proxyUser, sessionStore, + None, mockApp.map { m => (_: BatchSession) => m }.getOrElse { s => SparkApp.create(m.appTag, m.appId, None, livyConf, Option(s)) }) @@ -152,6 +154,7 @@ class BatchSession( owner: String, override val proxyUser: Option[String], sessionStore: SessionStore, + val queue: Option[String], sparkApp: BatchSession => SparkApp) extends Session(id, name, owner, livyConf) with SparkAppListener { import BatchSession._ diff --git a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala index 0667b718c..c6b62309d 100644 --- a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala +++ b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala @@ -107,7 +107,7 @@ object InteractiveSession extends Logging { SparkLauncher.EXECUTOR_MEMORY -> request.executorMemory.map(_.toString), "spark.executor.instances" -> request.numExecutors.map(_.toString), "spark.app.name" -> request.name.map(_.toString), - "spark.yarn.queue" -> request.queue + "spark.yarn.queue" -> request.queue.orElse(livyConf.getYarnQueue()) ) userOpts.foreach { case (key, opt) => @@ -152,7 +152,7 @@ object InteractiveSession extends Logging { request.jars, request.numExecutors, request.pyFiles, - request.queue, + request.queue.orElse(livyConf.getYarnQueue()), mockApp) } diff --git a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala index 401a8beb1..d22dea256 100644 --- a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala @@ -138,6 +138,65 @@ class BatchSessionSpec }) should be (true) } + it("should inherit the default YARN queue from LivyConf when request queue is empty") { + val req = new CreateBatchRequest() + req.file = script.toString + req.queue = None // Explicitly empty + req.conf = Map("spark.driver.extraClassPath" -> sys.props("java.class.path")) + + // Set default queue in LivyConf configuration + val conf = new LivyConf() + .set(LivyConf.LOCAL_FS_WHITELIST, sys.props("java.io.tmpdir")) + .set(LivyConf.SPARK_YARN_QUEUE, "livy-default-batch-queue") + + val accessManager = new AccessManager(conf) + val mockApp = mock[SparkApp] + + val batch = BatchSession.create( + id = 10, + name = None, + request = req, + livyConf = conf, + accessManager = accessManager, + owner = null, + proxyUser = None, + sessionStore = sessionStore, + mockApp = Some(mockApp) + ) + + // Verify that the batch session structure captured the fallback queue configuration + batch.queue shouldBe Some("livy-default-batch-queue") + } + + it("should prioritize user-specified request queue over LivyConf configuration") { + val req = new CreateBatchRequest() + req.file = script.toString + req.queue = Some("user-custom-batch-queue") // Explicitly requested by user + req.conf = Map("spark.driver.extraClassPath" -> sys.props("java.class.path")) + + val conf = new LivyConf() + .set(LivyConf.LOCAL_FS_WHITELIST, sys.props("java.io.tmpdir")) + .set(LivyConf.SPARK_YARN_QUEUE, "livy-default-batch-queue") + + val accessManager = new AccessManager(conf) + val mockApp = mock[SparkApp] + + val batch = BatchSession.create( + id = 20, + name = None, + request = req, + livyConf = conf, + accessManager = accessManager, + owner = null, + proxyUser = None, + sessionStore = sessionStore, + mockApp = Some(mockApp) + ) + + // Verify user context takes absolute priority over fallback definition + batch.queue shouldBe Some("user-custom-batch-queue") + } + def testRecoverSession(name: Option[String]): Unit = { val conf = new LivyConf() val req = new CreateBatchRequest() diff --git a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala index e7d651f89..d443dea6b 100644 --- a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala @@ -322,4 +322,71 @@ class InteractiveSessionSpec extends FunSpec s.logLines().mkString should include("RSCDriver URI is unknown") } } + + describe("InteractiveSession") { + it("should inherit the default YARN queue from LivyConf when request queue is empty") { + // Create a clean instance of LivyConf for this isolated check + val testLivyConf = new LivyConf() + .set(LivyConf.REPL_JARS, "dummy.jar") + .set(LivyConf.SPARK_YARN_QUEUE, "livy-default-queue") + + val req = new CreateInteractiveRequest() + req.kind = Spark + req.queue = None // Explicitly empty + req.conf = Map(RSCConf.Entry.LIVY_JARS.key() -> "") + + // Create a mock RSCClient to avoid spawning a real background spark-submit process + val mockClient = Some(mock[RSCClient]) + + val s = InteractiveSession.create( + id = 101, + name = None, + owner = "systest", + proxyUser = None, + livyConf = testLivyConf, + accessManager = accessManager, + request = req, + sessionStore = mock[SessionStore], + ttl = None, + idleTimeout = None, + mockApp = None, + mockClient = mockClient + ) + + // Verify that the internal session state correctly holds the fallback queue string + s.queue shouldBe Some("livy-default-queue") + } + + it("should prioritize user-specified request queue over LivyConf global configuration") { + val testLivyConf = new LivyConf() + .set(LivyConf.REPL_JARS, "dummy.jar") + .set(LivyConf.SPARK_YARN_QUEUE, "livy-default-queue") + + val req = new CreateInteractiveRequest() + req.kind = Spark + req.queue = Some("user-custom-queue") // Explicitly provided by request + req.conf = Map(RSCConf.Entry.LIVY_JARS.key() -> "") + + val mockClient = Some(mock[RSCClient]) + + val s = InteractiveSession.create( + id = 102, + name = None, + owner = "systest", + proxyUser = None, + livyConf = testLivyConf, + accessManager = accessManager, + request = req, + sessionStore = mock[SessionStore], + ttl = None, + idleTimeout = None, + mockApp = None, + mockClient = mockClient + ) + + // Verify that user context takes absolute priority over fallback definitions + s.queue shouldBe Some("user-custom-queue") + } + + } }