From a712fc82f4830f3bdbbcc846da6e5c517ab8c4f0 Mon Sep 17 00:00:00 2001 From: Matthew Ball Date: Sun, 3 May 2026 00:39:21 -0700 Subject: [PATCH 1/4] Adding a connection pool to reduce the time spent opening connections to Postgres --- common/dao/build.sbt | 1 + .../org/apache/texera/dao/SqlServer.scala | 41 ++++++++++++++----- 2 files changed, 32 insertions(+), 10 deletions(-) diff --git a/common/dao/build.sbt b/common/dao/build.sbt index 6b352d6975c..7500f21d7d8 100644 --- a/common/dao/build.sbt +++ b/common/dao/build.sbt @@ -175,4 +175,5 @@ libraryDependencies ++= Seq( libraryDependencies ++= Seq( "org.postgresql" % "postgresql" % "42.7.4", + "com.zaxxer" % "HikariCP" % "5.1.0" ) diff --git a/common/dao/src/main/scala/org/apache/texera/dao/SqlServer.scala b/common/dao/src/main/scala/org/apache/texera/dao/SqlServer.scala index 942bac4f07a..82d5ba970f1 100644 --- a/common/dao/src/main/scala/org/apache/texera/dao/SqlServer.scala +++ b/common/dao/src/main/scala/org/apache/texera/dao/SqlServer.scala @@ -19,39 +19,59 @@ package org.apache.texera.dao +import com.zaxxer.hikari.{HikariConfig, HikariDataSource} import org.jooq.impl.DSL import org.jooq.{DSLContext, SQLDialect} -import org.postgresql.ds.PGSimpleDataSource /** * SqlServer class that manages a connection to a PostgreSQL database using jOOQ. * + * Uses a HikariCP connection pool so that every jOOQ query borrows a pre-authenticated + * connection from the pool rather than opening a new TCP + SCRAM handshake each time. + * * WARNING: Do not cache the DSLContext returned by `createDSLContext()` in a val or lazy val. * During testing, `MockTexeraDB` replaces the SqlServer instance between test classes. * A cached DSLContext will hold a stale reference to a dead database connection from a previous test class, * causing "Connection refused" errors when tests run together. * Use `def` to ensure the connection is looked up each time. * - * @param url The database connection URL. - * @param user The username for authenticating with the database. + * @param url The JDBC connection URL. + * @param user The username for authenticating with the database. * @param password The password for authenticating with the database. */ class SqlServer private (url: String, user: String, password: String) { val SQL_DIALECT: SQLDialect = SQLDialect.POSTGRES - private val dataSource: PGSimpleDataSource = new PGSimpleDataSource() - var context: DSLContext = { - dataSource.setUrl(url) - dataSource.setUser(user) - dataSource.setPassword(password) - dataSource.setConnectTimeout(5) - DSL.using(dataSource, SQL_DIALECT) + + private val hikariConfig: HikariConfig = { + val cfg = new HikariConfig() + cfg.setJdbcUrl(url) + cfg.setUsername(user) + cfg.setPassword(password) + cfg.setPoolName("texera-hikari") + cfg.setMaximumPoolSize(10) + cfg.setMinimumIdle(2) + // How long a caller waits for a connection before throwing (ms) + cfg.setConnectionTimeout(30000) + // How long an idle connection stays in the pool before being retired (ms) + cfg.setIdleTimeout(600000) + // Maximum lifetime of any connection in the pool (ms); must be < PostgreSQL's idle timeout + cfg.setMaxLifetime(1800000) + cfg } + private val dataSource: HikariDataSource = new HikariDataSource(hikariConfig) + + var context: DSLContext = DSL.using(dataSource, SQL_DIALECT) + def createDSLContext(): DSLContext = context def replaceDSLContext(newContext: DSLContext): Unit = { context = newContext } + + def close(): Unit = { + if (!dataSource.isClosed) dataSource.close() + } } object SqlServer { @@ -69,6 +89,7 @@ object SqlServer { } def clearInstance(): Unit = { + instance.foreach(_.close()) instance = None } From b5c9ec84151df11f93a234baf8d64cf2d7164d78 Mon Sep 17 00:00:00 2001 From: Matthew Ball Date: Mon, 4 May 2026 12:25:33 -0700 Subject: [PATCH 2/4] remove clearInstance --- .../dao/src/main/scala/org/apache/texera/dao/SqlServer.scala | 5 ----- .../src/test/scala/org/apache/texera/dao/MockTexeraDB.scala | 1 - 2 files changed, 6 deletions(-) diff --git a/common/dao/src/main/scala/org/apache/texera/dao/SqlServer.scala b/common/dao/src/main/scala/org/apache/texera/dao/SqlServer.scala index 82d5ba970f1..09f5add2a61 100644 --- a/common/dao/src/main/scala/org/apache/texera/dao/SqlServer.scala +++ b/common/dao/src/main/scala/org/apache/texera/dao/SqlServer.scala @@ -88,11 +88,6 @@ object SqlServer { instance.get } - def clearInstance(): Unit = { - instance.foreach(_.close()) - instance = None - } - /** * A utility function for create a transaction block using given sql context * @param dsl the sql context diff --git a/common/dao/src/test/scala/org/apache/texera/dao/MockTexeraDB.scala b/common/dao/src/test/scala/org/apache/texera/dao/MockTexeraDB.scala index e13ff696cfa..3ae97b62e03 100644 --- a/common/dao/src/test/scala/org/apache/texera/dao/MockTexeraDB.scala +++ b/common/dao/src/test/scala/org/apache/texera/dao/MockTexeraDB.scala @@ -67,7 +67,6 @@ trait MockTexeraDB { value.close() dbInstance = None dslContext = None - SqlServer.clearInstance() case None => // do nothing } From b242517e9aca10274123c0ac3fc9b5747509f08d Mon Sep 17 00:00:00 2001 From: Matthew Ball Date: Mon, 4 May 2026 12:32:41 -0700 Subject: [PATCH 3/4] Added HikariCP to all the LICENSE-binary files under Apache2.0 --- access-control-service/LICENSE-binary | 1 + computing-unit-managing-service/LICENSE-binary | 1 + config-service/LICENSE-binary | 1 + file-service/LICENSE-binary | 1 + workflow-compiling-service/LICENSE-binary | 1 + 5 files changed, 5 insertions(+) diff --git a/access-control-service/LICENSE-binary b/access-control-service/LICENSE-binary index 9502e28b215..bb4a8a78b69 100644 --- a/access-control-service/LICENSE-binary +++ b/access-control-service/LICENSE-binary @@ -247,6 +247,7 @@ Scala/Java jars: - com.thesamet.scalapb.scalapb-runtime_2.13-0.11.20.jar - com.typesafe.config-1.4.6.jar - com.typesafe.scala-logging.scala-logging_2.13-3.9.5.jar + - com.zaxxer.HikariCP-5.1.0.jar - io.dropwizard.dropwizard-auth-4.0.7.jar - io.dropwizard.dropwizard-configuration-4.0.7.jar - io.dropwizard.dropwizard-core-4.0.7.jar diff --git a/computing-unit-managing-service/LICENSE-binary b/computing-unit-managing-service/LICENSE-binary index 89d1edddb76..97dd05bcf03 100644 --- a/computing-unit-managing-service/LICENSE-binary +++ b/computing-unit-managing-service/LICENSE-binary @@ -274,6 +274,7 @@ Scala/Java jars: - com.typesafe.play.play-functional_2.13-2.10.6.jar - com.typesafe.play.play-json_2.13-2.10.6.jar - com.typesafe.scala-logging.scala-logging_2.13-3.9.5.jar + - com.zaxxer.HikariCP-5.1.0.jar - commons-beanutils.commons-beanutils-1.9.4.jar - commons-cli.commons-cli-1.2.jar - commons-codec.commons-codec-1.17.1.jar diff --git a/config-service/LICENSE-binary b/config-service/LICENSE-binary index de970a5394d..956179e95ce 100644 --- a/config-service/LICENSE-binary +++ b/config-service/LICENSE-binary @@ -247,6 +247,7 @@ Scala/Java jars: - com.thesamet.scalapb.scalapb-runtime_2.13-0.11.20.jar - com.typesafe.config-1.4.6.jar - com.typesafe.scala-logging.scala-logging_2.13-3.9.5.jar + - com.zaxxer.HikariCP-5.1.0.jar - io.dropwizard.dropwizard-auth-4.0.7.jar - io.dropwizard.dropwizard-configuration-4.0.7.jar - io.dropwizard.dropwizard-core-4.0.7.jar diff --git a/file-service/LICENSE-binary b/file-service/LICENSE-binary index 6d3da7abc3c..0a300085901 100644 --- a/file-service/LICENSE-binary +++ b/file-service/LICENSE-binary @@ -268,6 +268,7 @@ Scala/Java jars: - com.thesamet.scalapb.scalapb-runtime_2.13-0.11.20.jar - com.typesafe.config-1.4.6.jar - com.typesafe.scala-logging.scala-logging_2.13-3.9.5.jar + - com.zaxxer.HikariCP-5.1.0.jar - commons-beanutils.commons-beanutils-1.9.4.jar - commons-cli.commons-cli-1.2.jar - commons-codec.commons-codec-1.17.1.jar diff --git a/workflow-compiling-service/LICENSE-binary b/workflow-compiling-service/LICENSE-binary index 1f41bd024a4..78ca9388fc7 100644 --- a/workflow-compiling-service/LICENSE-binary +++ b/workflow-compiling-service/LICENSE-binary @@ -269,6 +269,7 @@ Scala/Java jars: - com.thesamet.scalapb.scalapb-runtime_2.13-0.11.20.jar - com.typesafe.config-1.4.6.jar - com.typesafe.scala-logging.scala-logging_2.13-3.9.5.jar + - com.zaxxer.HikariCP-5.1.0.jar - com.univocity.univocity-parsers-2.9.1.jar - commons-beanutils.commons-beanutils-1.9.4.jar - commons-cli.commons-cli-1.2.jar From c16c3e7aa4d32c55fd79a37bd32518365229cf90 Mon Sep 17 00:00:00 2001 From: Matthew Ball Date: Mon, 4 May 2026 12:43:32 -0700 Subject: [PATCH 4/4] Added test cases for HikariCP --- .../org/apache/texera/dao/SqlServerSpec.scala | 196 ++++++++++++++++++ 1 file changed, 196 insertions(+) create mode 100644 common/dao/src/test/scala/org/apache/texera/dao/SqlServerSpec.scala diff --git a/common/dao/src/test/scala/org/apache/texera/dao/SqlServerSpec.scala b/common/dao/src/test/scala/org/apache/texera/dao/SqlServerSpec.scala new file mode 100644 index 00000000000..2a2432edf80 --- /dev/null +++ b/common/dao/src/test/scala/org/apache/texera/dao/SqlServerSpec.scala @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.dao + +import com.zaxxer.hikari.{HikariConfig, HikariDataSource} +import org.jooq.impl.DSL +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import org.scalatest.{BeforeAndAfterAll} + +class SqlServerSpec + extends AnyFlatSpec + with Matchers + with BeforeAndAfterAll + with MockTexeraDB { + + override def beforeAll(): Unit = initializeDBAndReplaceDSLContext() + override def afterAll(): Unit = shutdownDB() + + // ------------------------------------------------------------------------- + // SqlServer.withTransaction + // + // getDSLContext is backed by the embedded Postgres DataSource, so each + // top-level query borrows a connection from the pool. withTransaction + // binds a single connection for the duration of the block, making rollback + // and commit behaviour fully observable. + // ------------------------------------------------------------------------- + + "SqlServer.withTransaction" should "return the value produced by the block" in { + val result = SqlServer.withTransaction(getDSLContext) { _ => 42 } + result shouldBe 42 + } + + it should "commit the block's work so subsequent queries observe the changes" in { + // SELECT 1 is a lightweight live query; completing without error confirms + // the transaction committed and the connection was returned cleanly. + val result = SqlServer.withTransaction(getDSLContext) { ctx => + ctx.selectOne().fetchOne().value1() + } + result shouldBe 1 + } + + it should "re-throw the exception when the block throws" in { + val boom = new RuntimeException("intentional failure") + val thrown = intercept[RuntimeException] { + SqlServer.withTransaction(getDSLContext) { _ => throw boom } + } + thrown.getMessage should include("intentional failure") + } + + it should "roll back all DML in the block when an exception is thrown" in { + // A permanent (non-TEMP) table is used so every connection from the pool + // can see it; TEMP tables are session-scoped and would be invisible across + // pool connections. + val dsl = getDSLContext + dsl.execute("CREATE TABLE IF NOT EXISTS _txn_rollback_test (v INT)") + try { + intercept[RuntimeException] { + SqlServer.withTransaction(dsl) { ctx => + ctx.execute("INSERT INTO _txn_rollback_test VALUES (99)") + throw new RuntimeException("force rollback") + } + } + // The INSERT was inside the rolled-back transaction, so the table must + // still be empty. + dsl.fetchCount(DSL.table(DSL.name("_txn_rollback_test"))) shouldBe 0 + } finally { + dsl.execute("DROP TABLE IF EXISTS _txn_rollback_test") + } + } + + it should "support nested return types beyond Int" in { + val result = SqlServer.withTransaction(getDSLContext) { ctx => + ctx.selectOne().fetchOne().value1().toString + } + result shouldBe "1" + } + + // ------------------------------------------------------------------------- + // HikariCP pool lifecycle and configuration + // + // These tests create their own HikariDataSource against the embedded Postgres + // instance so they can drive the pool directly, independently of the + // DSLContext replacement that MockTexeraDB applies for its own queries. + // ------------------------------------------------------------------------- + + private def buildPool(maxSize: Int = 5, minIdle: Int = 1, poolName: String = "spec-pool"): HikariDataSource = { + // Use the default "postgres" database so no schema setup is needed. + val jdbcUrl = getDBInstance.getJdbcUrl("postgres", "postgres") + val cfg = new HikariConfig() + cfg.setJdbcUrl(jdbcUrl) + cfg.setUsername("postgres") + cfg.setPassword("") + cfg.setPoolName(poolName) + cfg.setMaximumPoolSize(maxSize) + cfg.setMinimumIdle(minIdle) + cfg.setConnectionTimeout(5000) + new HikariDataSource(cfg) + } + + "HikariCP pool" should "provide a usable connection that can execute queries" in { + val ds = buildPool() + try { + val conn = ds.getConnection + try { + val rs = conn.prepareStatement("SELECT 1").executeQuery() + rs.next() shouldBe true + rs.getInt(1) shouldBe 1 + } finally conn.close() + } finally ds.close() + } + + it should "apply the configured pool name" in { + val ds = buildPool(poolName = "my-named-pool") + try { + ds.getHikariConfigMXBean.getPoolName shouldBe "my-named-pool" + } finally ds.close() + } + + it should "apply the configured maximum pool size" in { + val ds = buildPool(maxSize = 7) + try { + ds.getHikariConfigMXBean.getMaximumPoolSize shouldBe 7 + } finally ds.close() + } + + it should "apply the configured minimum idle connections" in { + val ds = buildPool(minIdle = 2) + try { + ds.getHikariConfigMXBean.getMinimumIdle shouldBe 2 + } finally ds.close() + } + + it should "count a borrowed connection as active" in { + val ds = buildPool() + try { + val conn = ds.getConnection + try { + ds.getHikariPoolMXBean.getActiveConnections should be >= 1 + } finally conn.close() + } finally ds.close() + } + + it should "decrement active count and increment idle count once a connection is returned" in { + val ds = buildPool() + try { + val conn = ds.getConnection + conn.close() + ds.getHikariPoolMXBean.getActiveConnections shouldBe 0 + ds.getHikariPoolMXBean.getIdleConnections should be >= 1 + } finally ds.close() + } + + it should "allow up to the maximum pool size connections to be borrowed concurrently" in { + val ds = buildPool(maxSize = 3) + try { + val c1 = ds.getConnection + val c2 = ds.getConnection + val c3 = ds.getConnection + ds.getHikariPoolMXBean.getActiveConnections shouldBe 3 + c1.close(); c2.close(); c3.close() + } finally ds.close() + } + + it should "report isClosed as false while open and true after close" in { + val ds = buildPool() + ds.isClosed shouldBe false + ds.close() + ds.isClosed shouldBe true + } + + it should "reject getConnection after the pool has been closed" in { + val ds = buildPool() + ds.close() + // HikariCP throws an SQLException (wrapped as RuntimeException by the pool) + // when a caller tries to borrow from a closed pool. + assertThrows[Exception](ds.getConnection) + } +}