Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions access-control-service/LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ Scala/Java jars:
- com.thesamet.scalapb.scalapb-runtime_2.13-0.11.20.jar
- com.typesafe.config-1.4.6.jar
- com.typesafe.scala-logging.scala-logging_2.13-3.9.5.jar
- com.zaxxer.HikariCP-5.1.0.jar
- io.dropwizard.dropwizard-auth-4.0.7.jar
- io.dropwizard.dropwizard-configuration-4.0.7.jar
- io.dropwizard.dropwizard-core-4.0.7.jar
Expand Down
1 change: 1 addition & 0 deletions common/dao/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -175,4 +175,5 @@ libraryDependencies ++= Seq(

libraryDependencies ++= Seq(
"org.postgresql" % "postgresql" % "42.7.4",
"com.zaxxer" % "HikariCP" % "5.1.0"
Comment thread
Ma77Ball marked this conversation as resolved.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the latest version is HikariCP-7.0.2, any particular reason to stay on v5.1.0?

)
44 changes: 30 additions & 14 deletions common/dao/src/main/scala/org/apache/texera/dao/SqlServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,39 +19,59 @@

package org.apache.texera.dao

import com.zaxxer.hikari.{HikariConfig, HikariDataSource}
import org.jooq.impl.DSL
import org.jooq.{DSLContext, SQLDialect}
import org.postgresql.ds.PGSimpleDataSource

/**
* SqlServer class that manages a connection to a PostgreSQL database using jOOQ.
*
* Uses a HikariCP connection pool so that every jOOQ query borrows a pre-authenticated
* connection from the pool rather than opening a new TCP + SCRAM handshake each time.
*
* WARNING: Do not cache the DSLContext returned by `createDSLContext()` in a val or lazy val.
* During testing, `MockTexeraDB` replaces the SqlServer instance between test classes.
* A cached DSLContext will hold a stale reference to a dead database connection from a previous test class,
* causing "Connection refused" errors when tests run together.
* Use `def` to ensure the connection is looked up each time.
*
* @param url The database connection URL.
* @param user The username for authenticating with the database.
* @param url The JDBC connection URL.
* @param user The username for authenticating with the database.
* @param password The password for authenticating with the database.
*/
class SqlServer private (url: String, user: String, password: String) {
val SQL_DIALECT: SQLDialect = SQLDialect.POSTGRES
private val dataSource: PGSimpleDataSource = new PGSimpleDataSource()
var context: DSLContext = {
dataSource.setUrl(url)
dataSource.setUser(user)
dataSource.setPassword(password)
dataSource.setConnectTimeout(5)
DSL.using(dataSource, SQL_DIALECT)

private val hikariConfig: HikariConfig = {
val cfg = new HikariConfig()
cfg.setJdbcUrl(url)
cfg.setUsername(user)
cfg.setPassword(password)
cfg.setPoolName("texera-hikari")
cfg.setMaximumPoolSize(10)
cfg.setMinimumIdle(2)
// How long a caller waits for a connection before throwing (ms)
cfg.setConnectionTimeout(30000)
// How long an idle connection stays in the pool before being retired (ms)
cfg.setIdleTimeout(600000)
// Maximum lifetime of any connection in the pool (ms); must be < PostgreSQL's idle timeout
cfg.setMaxLifetime(1800000)
Comment thread
Ma77Ball marked this conversation as resolved.
cfg
}

private val dataSource: HikariDataSource = new HikariDataSource(hikariConfig)

var context: DSLContext = DSL.using(dataSource, SQL_DIALECT)

def createDSLContext(): DSLContext = context

def replaceDSLContext(newContext: DSLContext): Unit = {
context = newContext
}

def close(): Unit = {
if (!dataSource.isClosed) dataSource.close()
}
}

object SqlServer {
Expand All @@ -68,10 +88,6 @@ object SqlServer {
instance.get
}

def clearInstance(): Unit = {
instance = None
}

/**
* A utility function for create a transaction block using given sql context
* @param dsl the sql context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ trait MockTexeraDB {
value.close()
dbInstance = None
dslContext = None
SqlServer.clearInstance()
case None =>
// do nothing
}
Expand Down
196 changes: 196 additions & 0 deletions common/dao/src/test/scala/org/apache/texera/dao/SqlServerSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.texera.dao

import com.zaxxer.hikari.{HikariConfig, HikariDataSource}
import org.jooq.impl.DSL
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import org.scalatest.{BeforeAndAfterAll}

class SqlServerSpec
extends AnyFlatSpec
with Matchers
with BeforeAndAfterAll
with MockTexeraDB {

override def beforeAll(): Unit = initializeDBAndReplaceDSLContext()
override def afterAll(): Unit = shutdownDB()

// -------------------------------------------------------------------------
// SqlServer.withTransaction
//
// getDSLContext is backed by the embedded Postgres DataSource, so each
// top-level query borrows a connection from the pool. withTransaction
// binds a single connection for the duration of the block, making rollback
// and commit behaviour fully observable.
// -------------------------------------------------------------------------

"SqlServer.withTransaction" should "return the value produced by the block" in {
val result = SqlServer.withTransaction(getDSLContext) { _ => 42 }
result shouldBe 42
}

it should "commit the block's work so subsequent queries observe the changes" in {
// SELECT 1 is a lightweight live query; completing without error confirms
// the transaction committed and the connection was returned cleanly.
val result = SqlServer.withTransaction(getDSLContext) { ctx =>
ctx.selectOne().fetchOne().value1()
}
result shouldBe 1
}

it should "re-throw the exception when the block throws" in {
val boom = new RuntimeException("intentional failure")
val thrown = intercept[RuntimeException] {
SqlServer.withTransaction(getDSLContext) { _ => throw boom }
}
thrown.getMessage should include("intentional failure")
}

it should "roll back all DML in the block when an exception is thrown" in {
// A permanent (non-TEMP) table is used so every connection from the pool
// can see it; TEMP tables are session-scoped and would be invisible across
// pool connections.
val dsl = getDSLContext
dsl.execute("CREATE TABLE IF NOT EXISTS _txn_rollback_test (v INT)")
try {
intercept[RuntimeException] {
SqlServer.withTransaction(dsl) { ctx =>
ctx.execute("INSERT INTO _txn_rollback_test VALUES (99)")
throw new RuntimeException("force rollback")
}
}
// The INSERT was inside the rolled-back transaction, so the table must
// still be empty.
dsl.fetchCount(DSL.table(DSL.name("_txn_rollback_test"))) shouldBe 0
} finally {
dsl.execute("DROP TABLE IF EXISTS _txn_rollback_test")
}
}

it should "support nested return types beyond Int" in {
val result = SqlServer.withTransaction(getDSLContext) { ctx =>
ctx.selectOne().fetchOne().value1().toString
}
result shouldBe "1"
}

// -------------------------------------------------------------------------
// HikariCP pool lifecycle and configuration
//
// These tests create their own HikariDataSource against the embedded Postgres
// instance so they can drive the pool directly, independently of the
// DSLContext replacement that MockTexeraDB applies for its own queries.
// -------------------------------------------------------------------------

private def buildPool(maxSize: Int = 5, minIdle: Int = 1, poolName: String = "spec-pool"): HikariDataSource = {
// Use the default "postgres" database so no schema setup is needed.
val jdbcUrl = getDBInstance.getJdbcUrl("postgres", "postgres")
val cfg = new HikariConfig()
cfg.setJdbcUrl(jdbcUrl)
cfg.setUsername("postgres")
cfg.setPassword("")
cfg.setPoolName(poolName)
cfg.setMaximumPoolSize(maxSize)
cfg.setMinimumIdle(minIdle)
cfg.setConnectionTimeout(5000)
new HikariDataSource(cfg)
}

"HikariCP pool" should "provide a usable connection that can execute queries" in {
val ds = buildPool()
try {
val conn = ds.getConnection
try {
val rs = conn.prepareStatement("SELECT 1").executeQuery()
rs.next() shouldBe true
rs.getInt(1) shouldBe 1
} finally conn.close()
} finally ds.close()
}

it should "apply the configured pool name" in {
val ds = buildPool(poolName = "my-named-pool")
try {
ds.getHikariConfigMXBean.getPoolName shouldBe "my-named-pool"
} finally ds.close()
}

it should "apply the configured maximum pool size" in {
val ds = buildPool(maxSize = 7)
try {
ds.getHikariConfigMXBean.getMaximumPoolSize shouldBe 7
} finally ds.close()
}

it should "apply the configured minimum idle connections" in {
val ds = buildPool(minIdle = 2)
try {
ds.getHikariConfigMXBean.getMinimumIdle shouldBe 2
} finally ds.close()
}

it should "count a borrowed connection as active" in {
val ds = buildPool()
try {
val conn = ds.getConnection
try {
ds.getHikariPoolMXBean.getActiveConnections should be >= 1
} finally conn.close()
} finally ds.close()
}

it should "decrement active count and increment idle count once a connection is returned" in {
val ds = buildPool()
try {
val conn = ds.getConnection
conn.close()
ds.getHikariPoolMXBean.getActiveConnections shouldBe 0
ds.getHikariPoolMXBean.getIdleConnections should be >= 1
} finally ds.close()
}

it should "allow up to the maximum pool size connections to be borrowed concurrently" in {
val ds = buildPool(maxSize = 3)
try {
val c1 = ds.getConnection
val c2 = ds.getConnection
val c3 = ds.getConnection
ds.getHikariPoolMXBean.getActiveConnections shouldBe 3
c1.close(); c2.close(); c3.close()
} finally ds.close()
}

it should "report isClosed as false while open and true after close" in {
val ds = buildPool()
ds.isClosed shouldBe false
ds.close()
ds.isClosed shouldBe true
}

it should "reject getConnection after the pool has been closed" in {
val ds = buildPool()
ds.close()
// HikariCP throws an SQLException (wrapped as RuntimeException by the pool)
// when a caller tries to borrow from a closed pool.
assertThrows[Exception](ds.getConnection)
}
}
1 change: 1 addition & 0 deletions computing-unit-managing-service/LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ Scala/Java jars:
- com.typesafe.play.play-functional_2.13-2.10.6.jar
- com.typesafe.play.play-json_2.13-2.10.6.jar
- com.typesafe.scala-logging.scala-logging_2.13-3.9.5.jar
- com.zaxxer.HikariCP-5.1.0.jar
- commons-beanutils.commons-beanutils-1.9.4.jar
- commons-cli.commons-cli-1.2.jar
- commons-codec.commons-codec-1.17.1.jar
Expand Down
1 change: 1 addition & 0 deletions config-service/LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ Scala/Java jars:
- com.thesamet.scalapb.scalapb-runtime_2.13-0.11.20.jar
- com.typesafe.config-1.4.6.jar
- com.typesafe.scala-logging.scala-logging_2.13-3.9.5.jar
- com.zaxxer.HikariCP-5.1.0.jar
- io.dropwizard.dropwizard-auth-4.0.7.jar
- io.dropwizard.dropwizard-configuration-4.0.7.jar
- io.dropwizard.dropwizard-core-4.0.7.jar
Expand Down
1 change: 1 addition & 0 deletions file-service/LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ Scala/Java jars:
- com.thesamet.scalapb.scalapb-runtime_2.13-0.11.20.jar
- com.typesafe.config-1.4.6.jar
- com.typesafe.scala-logging.scala-logging_2.13-3.9.5.jar
- com.zaxxer.HikariCP-5.1.0.jar
- commons-beanutils.commons-beanutils-1.9.4.jar
- commons-cli.commons-cli-1.2.jar
- commons-codec.commons-codec-1.17.1.jar
Expand Down
1 change: 1 addition & 0 deletions workflow-compiling-service/LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ Scala/Java jars:
- com.thesamet.scalapb.scalapb-runtime_2.13-0.11.20.jar
- com.typesafe.config-1.4.6.jar
- com.typesafe.scala-logging.scala-logging_2.13-3.9.5.jar
- com.zaxxer.HikariCP-5.1.0.jar
- com.univocity.univocity-parsers-2.9.1.jar
- commons-beanutils.commons-beanutils-1.9.4.jar
- commons-cli.commons-cli-1.2.jar
Expand Down
Loading