diff --git a/README.md b/README.md index 6d33ece..0ae94e3 100644 --- a/README.md +++ b/README.md @@ -78,8 +78,8 @@ springOutboxPublisher.publish( **Using MySQL instead of PostgreSQL?** Replace `okapi-postgres` with `okapi-mysql` in your dependencies — no code changes needed. -> **Note:** `okapi-postgres` and `okapi-mysql` require Exposed ORM dependencies in your project. -> Spring and Kafka versions are not forced by okapi — you control them. +> **Note:** Spring and Kafka versions are not forced by okapi — you control them. +> Okapi uses plain JDBC internally — it works with any `PlatformTransactionManager` (JPA, JDBC, jOOQ, Exposed, etc.). ## How It Works @@ -103,6 +103,7 @@ graph BT MY[okapi-mysql] --> CORE HTTP[okapi-http] --> CORE KAFKA[okapi-kafka] --> CORE + EXP[okapi-exposed] --> CORE SPRING[okapi-spring-boot] --> CORE SPRING -.->|compileOnly| PG SPRING -.->|compileOnly| MY @@ -114,9 +115,10 @@ graph BT | Module | Purpose | |--------|---------| -| `okapi-core` | Transport/storage-agnostic orchestration, scheduling, retry policy | -| `okapi-postgres` | PostgreSQL storage via Exposed ORM (`FOR UPDATE SKIP LOCKED`) | -| `okapi-mysql` | MySQL 8+ storage via Exposed ORM | +| `okapi-core` | Transport/storage-agnostic orchestration, scheduling, retry policy, `ConnectionProvider` interface | +| `okapi-exposed` | Exposed ORM integration — `ExposedConnectionProvider`, `ExposedTransactionRunner`, `ExposedTransactionContextValidator` | +| `okapi-postgres` | PostgreSQL storage via plain JDBC (`FOR UPDATE SKIP LOCKED`) | +| `okapi-mysql` | MySQL 8+ storage via plain JDBC | | `okapi-http` | HTTP webhook delivery (JDK HttpClient) | | `okapi-kafka` | Kafka topic publishing | | `okapi-spring-boot` | Spring Boot autoconfiguration (auto-detects store and transports) | @@ -129,7 +131,7 @@ graph BT | Java | 21+ | Required | | Spring Boot | 3.5.x, 4.0.x | `okapi-spring-boot` module | | Kafka Clients | 3.9.x, 4.x | `okapi-kafka` — you provide `kafka-clients` | -| Exposed | 1.x | `okapi-postgres`, `okapi-mysql` — you provide Exposed | +| Exposed | 1.x | `okapi-exposed` module — for Ktor/standalone apps | ## Build diff --git a/okapi-bom/build.gradle.kts b/okapi-bom/build.gradle.kts index 588ba56..4b24a85 100644 --- a/okapi-bom/build.gradle.kts +++ b/okapi-bom/build.gradle.kts @@ -8,6 +8,7 @@ description = "BOM for consistent versioning of Okapi modules" dependencies { constraints { api(project(":okapi-core")) + api(project(":okapi-exposed")) api(project(":okapi-postgres")) api(project(":okapi-mysql")) api(project(":okapi-http")) diff --git a/okapi-core/build.gradle.kts b/okapi-core/build.gradle.kts index 691f8a8..a2fee38 100644 --- a/okapi-core/build.gradle.kts +++ b/okapi-core/build.gradle.kts @@ -7,10 +7,7 @@ description = "Core outbox abstractions and processing engine" dependencies { implementation(libs.slf4jApi) - compileOnly(libs.exposedJdbc) testImplementation(libs.kotestRunnerJunit5) testImplementation(libs.kotestAssertionsCore) - testImplementation(libs.exposedJdbc) - testImplementation(libs.h2) testRuntimeOnly(libs.slf4jSimple) } diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/ConnectionProvider.kt b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/ConnectionProvider.kt new file mode 100644 index 0000000..6586ffb --- /dev/null +++ b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/ConnectionProvider.kt @@ -0,0 +1,17 @@ +package com.softwaremill.okapi.core + +import java.sql.Connection + +/** + * Provides a JDBC [Connection] from the current transactional context. + * + * Implementations bridge okapi's [OutboxStore] with the caller's transaction mechanism: + * - `okapi-spring-boot`: uses `DataSourceUtils.getConnection()` — works with JPA, JDBC, jOOQ, MyBatis, Exposed + * - `okapi-exposed`: uses Exposed's `TransactionManager.current().connection` — for Ktor/standalone Exposed + * - Standalone: user-provided lambda wrapping a `DataSource` or `ThreadLocal` + * + * The returned connection is **borrowed** from the current transaction — the caller must NOT close it. + */ +fun interface ConnectionProvider { + fun getConnection(): Connection +} diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxPurger.kt b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxPurger.kt index 3c72376..632451c 100644 --- a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxPurger.kt +++ b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxPurger.kt @@ -18,6 +18,7 @@ import java.util.concurrent.atomic.AtomicBoolean */ class OutboxPurger( private val outboxStore: OutboxStore, + private val transactionRunner: TransactionRunner? = null, private val config: OutboxPurgerConfig = OutboxPurgerConfig(), private val clock: Clock = Clock.systemUTC(), ) { @@ -58,7 +59,9 @@ class OutboxPurger( var totalDeleted = 0 var batches = 0 do { - val deleted = outboxStore.removeDeliveredBefore(cutoff, config.batchSize) + val deleted = transactionRunner?.runInTransaction { + outboxStore.removeDeliveredBefore(cutoff, config.batchSize) + } ?: outboxStore.removeDeliveredBefore(cutoff, config.batchSize) totalDeleted += deleted batches++ } while (deleted == config.batchSize && batches < MAX_BATCHES_PER_TICK) diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/TransactionContextValidator.kt b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/TransactionContextValidator.kt index bcf51a0..3b30582 100644 --- a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/TransactionContextValidator.kt +++ b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/TransactionContextValidator.kt @@ -4,9 +4,8 @@ package com.softwaremill.okapi.core * Checks whether the current execution context is inside an active read-write transaction. * * Framework-specific modules provide implementations: - * - `okapi-spring`: [SpringTransactionContextValidator][com.softwaremill.okapi.springboot.SpringTransactionContextValidator] - * — checks via `TransactionSynchronizationManager` - * - `okapi-core`: [ExposedTransactionContextValidator] — checks via Exposed's `TransactionManager.currentOrNull()` + * - `okapi-spring-boot`: `SpringTransactionContextValidator` — checks via `TransactionSynchronizationManager` + * - `okapi-exposed`: `ExposedTransactionContextValidator` — checks via Exposed's `TransactionManager.currentOrNull()` * - Standalone: no-op (always returns true) */ interface TransactionContextValidator { diff --git a/okapi-exposed/build.gradle.kts b/okapi-exposed/build.gradle.kts new file mode 100644 index 0000000..033d656 --- /dev/null +++ b/okapi-exposed/build.gradle.kts @@ -0,0 +1,15 @@ +plugins { + id("buildsrc.convention.kotlin-jvm") + id("buildsrc.convention.publish") +} + +description = "Exposed ORM integration — ConnectionProvider, TransactionRunner, TransactionContextValidator" + +dependencies { + api(project(":okapi-core")) + implementation(libs.exposedJdbc) + + testImplementation(libs.kotestRunnerJunit5) + testImplementation(libs.kotestAssertionsCore) + testImplementation(libs.h2) +} diff --git a/okapi-exposed/src/main/kotlin/com/softwaremill/okapi/exposed/ExposedConnectionProvider.kt b/okapi-exposed/src/main/kotlin/com/softwaremill/okapi/exposed/ExposedConnectionProvider.kt new file mode 100644 index 0000000..939904a --- /dev/null +++ b/okapi-exposed/src/main/kotlin/com/softwaremill/okapi/exposed/ExposedConnectionProvider.kt @@ -0,0 +1,19 @@ +package com.softwaremill.okapi.exposed + +import com.softwaremill.okapi.core.ConnectionProvider +import org.jetbrains.exposed.v1.jdbc.transactions.TransactionManager +import java.sql.Connection + +/** + * Exposed implementation of [ConnectionProvider]. + * + * Retrieves the JDBC [Connection] from the current Exposed transaction. + * Use this when your application manages transactions via Exposed's + * `transaction(database) { }` blocks (e.g., Ktor + Exposed apps). + * + * The returned connection is **borrowed** from Exposed's active transaction — + * the caller must NOT close it. + */ +class ExposedConnectionProvider : ConnectionProvider { + override fun getConnection(): Connection = TransactionManager.current().connection.connection as Connection +} diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/ExposedTransactionContextValidator.kt b/okapi-exposed/src/main/kotlin/com/softwaremill/okapi/exposed/ExposedTransactionContextValidator.kt similarity index 92% rename from okapi-core/src/main/kotlin/com/softwaremill/okapi/core/ExposedTransactionContextValidator.kt rename to okapi-exposed/src/main/kotlin/com/softwaremill/okapi/exposed/ExposedTransactionContextValidator.kt index 685e5fc..5875443 100644 --- a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/ExposedTransactionContextValidator.kt +++ b/okapi-exposed/src/main/kotlin/com/softwaremill/okapi/exposed/ExposedTransactionContextValidator.kt @@ -1,5 +1,6 @@ -package com.softwaremill.okapi.core +package com.softwaremill.okapi.exposed +import com.softwaremill.okapi.core.TransactionContextValidator import org.jetbrains.exposed.v1.jdbc.Database import org.jetbrains.exposed.v1.jdbc.transactions.currentOrNull import org.jetbrains.exposed.v1.jdbc.transactions.transactionManager diff --git a/okapi-exposed/src/main/kotlin/com/softwaremill/okapi/exposed/ExposedTransactionRunner.kt b/okapi-exposed/src/main/kotlin/com/softwaremill/okapi/exposed/ExposedTransactionRunner.kt new file mode 100644 index 0000000..ca800c6 --- /dev/null +++ b/okapi-exposed/src/main/kotlin/com/softwaremill/okapi/exposed/ExposedTransactionRunner.kt @@ -0,0 +1,20 @@ +package com.softwaremill.okapi.exposed + +import com.softwaremill.okapi.core.TransactionRunner +import org.jetbrains.exposed.v1.jdbc.Database +import org.jetbrains.exposed.v1.jdbc.transactions.transaction + +/** + * Exposed implementation of [TransactionRunner]. + * + * Wraps the block in Exposed's `transaction(database) { }`. + * Used by the outbox scheduler/processor when running outside of + * an existing transactional context (e.g., background processing thread). + * + * @param database The [Database] instance where the outbox table resides. + */ +class ExposedTransactionRunner( + private val database: Database, +) : TransactionRunner { + override fun runInTransaction(block: () -> T): T = transaction(database) { block() } +} diff --git a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/ExposedTransactionContextValidatorTest.kt b/okapi-exposed/src/test/kotlin/com/softwaremill/okapi/exposed/ExposedTransactionContextValidatorTest.kt similarity index 98% rename from okapi-core/src/test/kotlin/com/softwaremill/okapi/core/ExposedTransactionContextValidatorTest.kt rename to okapi-exposed/src/test/kotlin/com/softwaremill/okapi/exposed/ExposedTransactionContextValidatorTest.kt index 2ae30d8..f1b4c10 100644 --- a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/ExposedTransactionContextValidatorTest.kt +++ b/okapi-exposed/src/test/kotlin/com/softwaremill/okapi/exposed/ExposedTransactionContextValidatorTest.kt @@ -1,4 +1,4 @@ -package com.softwaremill.okapi.core +package com.softwaremill.okapi.exposed import io.kotest.core.spec.style.BehaviorSpec import io.kotest.matchers.shouldBe diff --git a/okapi-integration-tests/build.gradle.kts b/okapi-integration-tests/build.gradle.kts index 80002e1..5c1b6ab 100644 --- a/okapi-integration-tests/build.gradle.kts +++ b/okapi-integration-tests/build.gradle.kts @@ -24,13 +24,6 @@ dependencies { testImplementation(libs.postgresql) testImplementation(libs.mysql) - // Exposed ORM (for transaction blocks and DB queries in tests) - testImplementation(libs.exposedCore) - testImplementation(libs.exposedJdbc) - testImplementation(libs.exposedJson) - testImplementation(libs.exposedJavaTime) - testImplementation(libs.exposedSpringTransaction) - // Liquibase (schema migrations in tests) testImplementation(libs.liquibaseCore) diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/concurrency/ConcurrentClaimTests.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/concurrency/ConcurrentClaimTests.kt index c3a0d64..55f1fd6 100644 --- a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/concurrency/ConcurrentClaimTests.kt +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/concurrency/ConcurrentClaimTests.kt @@ -9,13 +9,13 @@ import com.softwaremill.okapi.core.OutboxProcessor import com.softwaremill.okapi.core.OutboxStatus import com.softwaremill.okapi.core.OutboxStore import com.softwaremill.okapi.core.RetryPolicy +import com.softwaremill.okapi.test.support.JdbcConnectionProvider import com.softwaremill.okapi.test.support.RecordingMessageDeliverer import io.kotest.assertions.withClue import io.kotest.core.spec.style.FunSpec import io.kotest.matchers.collections.shouldHaveSize import io.kotest.matchers.maps.shouldContain import io.kotest.matchers.shouldBe -import org.jetbrains.exposed.v1.jdbc.transactions.transaction import java.sql.Connection import java.time.Clock import java.time.Instant @@ -41,15 +41,18 @@ private fun createTestEntry(index: Int, now: Instant = Instant.parse("2024-01-01 fun FunSpec.concurrentClaimTests( dbName: String, + jdbcProvider: () -> JdbcConnectionProvider, storeFactory: () -> OutboxStore, startDb: () -> Unit, stopDb: () -> Unit, truncate: () -> Unit, ) { lateinit var store: OutboxStore + lateinit var jdbc: JdbcConnectionProvider beforeSpec { startDb() + jdbc = jdbcProvider() store = storeFactory() } @@ -63,7 +66,7 @@ fun FunSpec.concurrentClaimTests( test("[$dbName] concurrent claimPending with held locks produces disjoint sets") { // Insert 20 entries - val allIds = transaction { + val allIds = jdbc.withTransaction { (0 until 20).map { i -> val entry = createTestEntry(i) store.persist(entry) @@ -80,7 +83,7 @@ fun FunSpec.concurrentClaimTests( // next-key locks cause SKIP LOCKED to skip more rows than actually locked. val threadA = Thread.ofVirtual().name("processor-A").start { try { - transaction(transactionIsolation = Connection.TRANSACTION_READ_COMMITTED) { + jdbc.withTransaction(transactionIsolation = Connection.TRANSACTION_READ_COMMITTED) { val claimed = store.claimPending(10) claimedByA.complete(claimed.map { it.outboxId }) lockAcquired.countDown() @@ -97,7 +100,7 @@ fun FunSpec.concurrentClaimTests( // Main thread: claim remaining entries (SKIP LOCKED should skip A's locked rows) val idsA = claimedByA.get(10, TimeUnit.SECONDS) - val idsB = transaction(transactionIsolation = Connection.TRANSACTION_READ_COMMITTED) { + val idsB = jdbc.withTransaction(transactionIsolation = Connection.TRANSACTION_READ_COMMITTED) { store.claimPending(10) }.map { it.outboxId } @@ -123,7 +126,7 @@ fun FunSpec.concurrentClaimTests( val fixedClock = Clock.fixed(Instant.parse("2024-01-01T00:00:00Z"), ZoneOffset.UTC) // Insert 50 entries - transaction { + jdbc.withTransaction { (0 until 50).forEach { i -> store.persist(createTestEntry(i)) } } @@ -137,7 +140,7 @@ fun FunSpec.concurrentClaimTests( CompletableFuture.supplyAsync( { barrier.await(10, TimeUnit.SECONDS) - transaction(transactionIsolation = Connection.TRANSACTION_READ_COMMITTED) { + jdbc.withTransaction(transactionIsolation = Connection.TRANSACTION_READ_COMMITTED) { OutboxProcessor(store, entryProcessor).processNext(limit = 50) } }, @@ -156,7 +159,7 @@ fun FunSpec.concurrentClaimTests( } // Verify DB state - val counts = transaction { store.countByStatuses() } + val counts = jdbc.withTransaction { store.countByStatuses() } withClue("DB state after concurrent processing: $counts") { counts shouldContain (OutboxStatus.DELIVERED to 50L) counts shouldContain (OutboxStatus.PENDING to 0L) diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/concurrency/MysqlConcurrentClaimTest.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/concurrency/MysqlConcurrentClaimTest.kt index 7ac7287..f2cb4a5 100644 --- a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/concurrency/MysqlConcurrentClaimTest.kt +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/concurrency/MysqlConcurrentClaimTest.kt @@ -12,7 +12,8 @@ class MysqlConcurrentClaimTest : FunSpec({ concurrentClaimTests( dbName = "mysql", - storeFactory = { MysqlOutboxStore(Clock.fixed(Instant.parse("2024-01-01T00:00:00Z"), ZoneOffset.UTC)) }, + jdbcProvider = { db.jdbc }, + storeFactory = { MysqlOutboxStore(db.jdbc, Clock.fixed(Instant.parse("2024-01-01T00:00:00Z"), ZoneOffset.UTC)) }, startDb = { db.start() }, stopDb = { db.stop() }, truncate = { db.truncate() }, diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/concurrency/PostgresConcurrentClaimTest.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/concurrency/PostgresConcurrentClaimTest.kt index b4b1c96..19710bc 100644 --- a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/concurrency/PostgresConcurrentClaimTest.kt +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/concurrency/PostgresConcurrentClaimTest.kt @@ -12,7 +12,8 @@ class PostgresConcurrentClaimTest : FunSpec({ concurrentClaimTests( dbName = "postgres", - storeFactory = { PostgresOutboxStore(Clock.fixed(Instant.parse("2024-01-01T00:00:00Z"), ZoneOffset.UTC)) }, + jdbcProvider = { db.jdbc }, + storeFactory = { PostgresOutboxStore(db.jdbc, Clock.fixed(Instant.parse("2024-01-01T00:00:00Z"), ZoneOffset.UTC)) }, startDb = { db.start() }, stopDb = { db.stop() }, truncate = { db.truncate() }, diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/HttpEndToEndTest.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/HttpEndToEndTest.kt index d6832ba..81633e5 100644 --- a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/HttpEndToEndTest.kt +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/HttpEndToEndTest.kt @@ -21,7 +21,6 @@ import com.softwaremill.okapi.postgres.PostgresOutboxStore import com.softwaremill.okapi.test.support.PostgresTestSupport import io.kotest.core.spec.style.FunSpec import io.kotest.matchers.maps.shouldContain -import org.jetbrains.exposed.v1.jdbc.transactions.transaction import java.time.Clock class HttpEndToEndTest : FunSpec({ @@ -45,7 +44,7 @@ class HttpEndToEndTest : FunSpec({ fun buildPipeline(maxRetries: Int = 3): Triple { val clock = Clock.systemUTC() - val store = PostgresOutboxStore(clock) + val store = PostgresOutboxStore(db.jdbc, clock) val publisher = OutboxPublisher(store, clock) val urlResolver = ServiceUrlResolver { "http://localhost:${wiremock.port()}" } val entryProcessor = OutboxEntryProcessor( @@ -70,15 +69,15 @@ class HttpEndToEndTest : FunSpec({ .willReturn(aResponse().withStatus(200)), ) - transaction { publisher.publish(OutboxMessage("order.created", payload), deliveryInfo()) } - transaction { processor.processNext() } + db.jdbc.withTransaction { publisher.publish(OutboxMessage("order.created", payload), deliveryInfo()) } + db.jdbc.withTransaction { processor.processNext() } wiremock.verify( postRequestedFor(urlEqualTo("/api/notify")) .withRequestBody(equalTo(payload)), ) - val counts = transaction { store.countByStatuses() } + val counts = db.jdbc.withTransaction { store.countByStatuses() } counts shouldContain (OutboxStatus.DELIVERED to 1L) } @@ -90,10 +89,10 @@ class HttpEndToEndTest : FunSpec({ .willReturn(aResponse().withStatus(500)), ) - transaction { publisher.publish(OutboxMessage("order.created", """{"id":"1"}"""), deliveryInfo()) } - transaction { processor.processNext() } + db.jdbc.withTransaction { publisher.publish(OutboxMessage("order.created", """{"id":"1"}"""), deliveryInfo()) } + db.jdbc.withTransaction { processor.processNext() } - val counts = transaction { store.countByStatuses() } + val counts = db.jdbc.withTransaction { store.countByStatuses() } counts shouldContain (OutboxStatus.PENDING to 1L) counts shouldContain (OutboxStatus.DELIVERED to 0L) } @@ -106,10 +105,10 @@ class HttpEndToEndTest : FunSpec({ .willReturn(aResponse().withStatus(400)), ) - transaction { publisher.publish(OutboxMessage("order.created", """{"id":"1"}"""), deliveryInfo()) } - transaction { processor.processNext() } + db.jdbc.withTransaction { publisher.publish(OutboxMessage("order.created", """{"id":"1"}"""), deliveryInfo()) } + db.jdbc.withTransaction { processor.processNext() } - val counts = transaction { store.countByStatuses() } + val counts = db.jdbc.withTransaction { store.countByStatuses() } counts shouldContain (OutboxStatus.FAILED to 1L) counts shouldContain (OutboxStatus.PENDING to 0L) } @@ -122,10 +121,10 @@ class HttpEndToEndTest : FunSpec({ .willReturn(aResponse().withFault(Fault.CONNECTION_RESET_BY_PEER)), ) - transaction { publisher.publish(OutboxMessage("order.created", """{"id":"1"}"""), deliveryInfo()) } - transaction { processor.processNext() } + db.jdbc.withTransaction { publisher.publish(OutboxMessage("order.created", """{"id":"1"}"""), deliveryInfo()) } + db.jdbc.withTransaction { processor.processNext() } - val counts = transaction { store.countByStatuses() } + val counts = db.jdbc.withTransaction { store.countByStatuses() } counts shouldContain (OutboxStatus.PENDING to 1L) } @@ -133,13 +132,13 @@ class HttpEndToEndTest : FunSpec({ val (publisher, _, store) = buildPipeline() runCatching { - transaction { + db.jdbc.withTransaction { publisher.publish(OutboxMessage("order.created", """{"id":"1"}"""), deliveryInfo()) error("Simulated business logic failure") } } - val counts = transaction { store.countByStatuses() } + val counts = db.jdbc.withTransaction { store.countByStatuses() } counts shouldContain (OutboxStatus.PENDING to 0L) } @@ -151,19 +150,19 @@ class HttpEndToEndTest : FunSpec({ .willReturn(aResponse().withStatus(500)), ) - transaction { publisher.publish(OutboxMessage("order.created", """{"id":"1"}"""), deliveryInfo()) } + db.jdbc.withTransaction { publisher.publish(OutboxMessage("order.created", """{"id":"1"}"""), deliveryInfo()) } // First 3 processNext calls: retries 0->1, 1->2, 2->3 — stays PENDING repeat(3) { - transaction { processor.processNext() } - val counts = transaction { store.countByStatuses() } + db.jdbc.withTransaction { processor.processNext() } + val counts = db.jdbc.withTransaction { store.countByStatuses() } counts shouldContain (OutboxStatus.PENDING to 1L) } // 4th processNext: retries==3, shouldRetry(3) returns false -> FAILED - transaction { processor.processNext() } + db.jdbc.withTransaction { processor.processNext() } - val counts = transaction { store.countByStatuses() } + val counts = db.jdbc.withTransaction { store.countByStatuses() } counts shouldContain (OutboxStatus.FAILED to 1L) counts shouldContain (OutboxStatus.PENDING to 0L) } diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/KafkaEndToEndTest.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/KafkaEndToEndTest.kt index 02bee9f..3fa9c9f 100644 --- a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/KafkaEndToEndTest.kt +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/KafkaEndToEndTest.kt @@ -15,7 +15,6 @@ import io.kotest.core.spec.style.FunSpec import io.kotest.matchers.maps.shouldContain import io.kotest.matchers.shouldBe import org.apache.kafka.clients.producer.KafkaProducer -import org.jetbrains.exposed.v1.jdbc.transactions.transaction import java.time.Clock import java.time.Duration import java.util.UUID @@ -43,7 +42,7 @@ class KafkaEndToEndTest : FunSpec({ test("full pipeline: publish to outbox -> processNext -> message on Kafka topic") { val clock = Clock.systemUTC() - val store = PostgresOutboxStore(clock) + val store = PostgresOutboxStore(db.jdbc, clock) val publisher = OutboxPublisher(store, clock) val deliverer = KafkaMessageDeliverer(producer!!) val entryProcessor = OutboxEntryProcessor(deliverer, RetryPolicy(maxRetries = 3), clock) @@ -56,10 +55,10 @@ class KafkaEndToEndTest : FunSpec({ partitionKey = "user-1" } - transaction { publisher.publish(OutboxMessage("order.created", payload), info) } - transaction { processor.processNext() } + db.jdbc.withTransaction { publisher.publish(OutboxMessage("order.created", payload), info) } + db.jdbc.withTransaction { processor.processNext() } - val counts = transaction { store.countByStatuses() } + val counts = db.jdbc.withTransaction { store.countByStatuses() } counts shouldContain (OutboxStatus.DELIVERED to 1L) val consumer = kafka.createConsumer(groupId = "e2e-test-${UUID.randomUUID()}") diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/MysqlHttpEndToEndTest.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/MysqlHttpEndToEndTest.kt index 99d6d22..41c5056 100644 --- a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/MysqlHttpEndToEndTest.kt +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/MysqlHttpEndToEndTest.kt @@ -21,7 +21,6 @@ import com.softwaremill.okapi.mysql.MysqlOutboxStore import com.softwaremill.okapi.test.support.MysqlTestSupport import io.kotest.core.spec.style.FunSpec import io.kotest.matchers.maps.shouldContain -import org.jetbrains.exposed.v1.jdbc.transactions.transaction import java.time.Clock class MysqlHttpEndToEndTest : FunSpec({ @@ -45,7 +44,7 @@ class MysqlHttpEndToEndTest : FunSpec({ fun buildPipeline(): Triple { val clock = Clock.systemUTC() - val store = MysqlOutboxStore(clock) + val store = MysqlOutboxStore(db.jdbc, clock) val publisher = OutboxPublisher(store, clock) val urlResolver = ServiceUrlResolver { "http://localhost:${wiremock.port()}" } val entryProcessor = OutboxEntryProcessor( @@ -70,15 +69,15 @@ class MysqlHttpEndToEndTest : FunSpec({ .willReturn(aResponse().withStatus(200)), ) - transaction { publisher.publish(OutboxMessage("order.created", payload), deliveryInfo()) } - transaction { processor.processNext() } + db.jdbc.withTransaction { publisher.publish(OutboxMessage("order.created", payload), deliveryInfo()) } + db.jdbc.withTransaction { processor.processNext() } wiremock.verify( postRequestedFor(urlEqualTo("/api/notify")) .withRequestBody(equalTo(payload)), ) - val counts = transaction { store.countByStatuses() } + val counts = db.jdbc.withTransaction { store.countByStatuses() } counts shouldContain (OutboxStatus.DELIVERED to 1L) } @@ -90,10 +89,10 @@ class MysqlHttpEndToEndTest : FunSpec({ .willReturn(aResponse().withStatus(500)), ) - transaction { publisher.publish(OutboxMessage("order.created", """{"id":"1"}"""), deliveryInfo()) } - transaction { processor.processNext() } + db.jdbc.withTransaction { publisher.publish(OutboxMessage("order.created", """{"id":"1"}"""), deliveryInfo()) } + db.jdbc.withTransaction { processor.processNext() } - val counts = transaction { store.countByStatuses() } + val counts = db.jdbc.withTransaction { store.countByStatuses() } counts shouldContain (OutboxStatus.PENDING to 1L) counts shouldContain (OutboxStatus.DELIVERED to 0L) } @@ -106,10 +105,10 @@ class MysqlHttpEndToEndTest : FunSpec({ .willReturn(aResponse().withStatus(400)), ) - transaction { publisher.publish(OutboxMessage("order.created", """{"id":"1"}"""), deliveryInfo()) } - transaction { processor.processNext() } + db.jdbc.withTransaction { publisher.publish(OutboxMessage("order.created", """{"id":"1"}"""), deliveryInfo()) } + db.jdbc.withTransaction { processor.processNext() } - val counts = transaction { store.countByStatuses() } + val counts = db.jdbc.withTransaction { store.countByStatuses() } counts shouldContain (OutboxStatus.FAILED to 1L) counts shouldContain (OutboxStatus.PENDING to 0L) } @@ -122,10 +121,10 @@ class MysqlHttpEndToEndTest : FunSpec({ .willReturn(aResponse().withFault(Fault.CONNECTION_RESET_BY_PEER)), ) - transaction { publisher.publish(OutboxMessage("order.created", """{"id":"1"}"""), deliveryInfo()) } - transaction { processor.processNext() } + db.jdbc.withTransaction { publisher.publish(OutboxMessage("order.created", """{"id":"1"}"""), deliveryInfo()) } + db.jdbc.withTransaction { processor.processNext() } - val counts = transaction { store.countByStatuses() } + val counts = db.jdbc.withTransaction { store.countByStatuses() } counts shouldContain (OutboxStatus.PENDING to 1L) } }) diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/store/MysqlOutboxStoreTest.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/store/MysqlOutboxStoreTest.kt index c2d4c51..8576fd4 100644 --- a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/store/MysqlOutboxStoreTest.kt +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/store/MysqlOutboxStoreTest.kt @@ -12,7 +12,13 @@ class MysqlOutboxStoreTest : FunSpec({ outboxStoreContractTests( dbName = "mysql", - storeFactory = { MysqlOutboxStore(Clock.fixed(Instant.parse("2024-01-01T00:00:00Z"), ZoneOffset.UTC)) }, + storeFactory = { + MysqlOutboxStore( + connectionProvider = db.jdbc, + clock = Clock.fixed(Instant.parse("2024-01-01T00:00:00Z"), ZoneOffset.UTC), + ) + }, + jdbcProvider = { db.jdbc }, startDb = { db.start() }, stopDb = { db.stop() }, truncate = { db.truncate() }, diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/store/OutboxStoreContractTests.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/store/OutboxStoreContractTests.kt index 58a0260..04ce1a2 100644 --- a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/store/OutboxStoreContractTests.kt +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/store/OutboxStoreContractTests.kt @@ -5,11 +5,11 @@ import com.softwaremill.okapi.core.OutboxEntry import com.softwaremill.okapi.core.OutboxMessage import com.softwaremill.okapi.core.OutboxStatus import com.softwaremill.okapi.core.OutboxStore +import com.softwaremill.okapi.test.support.JdbcConnectionProvider import io.kotest.core.spec.style.FunSpec import io.kotest.matchers.collections.shouldHaveSize import io.kotest.matchers.maps.shouldContain import io.kotest.matchers.shouldBe -import org.jetbrains.exposed.v1.jdbc.transactions.transaction import java.time.Instant private class StubDeliveryInfo( @@ -33,14 +33,17 @@ private fun createTestEntry( fun FunSpec.outboxStoreContractTests( dbName: String, storeFactory: () -> OutboxStore, + jdbcProvider: () -> JdbcConnectionProvider, startDb: () -> Unit, stopDb: () -> Unit, truncate: () -> Unit, ) { lateinit var store: OutboxStore + lateinit var jdbc: JdbcConnectionProvider beforeSpec { startDb() + jdbc = jdbcProvider() store = storeFactory() } @@ -55,9 +58,9 @@ fun FunSpec.outboxStoreContractTests( test("[$dbName] persist and read back via claimPending") { val entry = createTestEntry() - transaction { store.persist(entry) } + jdbc.withTransaction { store.persist(entry) } - val claimed = transaction { store.claimPending(10) } + val claimed = jdbc.withTransaction { store.claimPending(10) } claimed shouldHaveSize 1 val found = claimed.first() @@ -75,18 +78,17 @@ fun FunSpec.outboxStoreContractTests( val t2 = Instant.parse("2024-01-02T00:00:00Z") val t3 = Instant.parse("2024-01-03T00:00:00Z") - // Insert in non-sequential order to verify ordering val e2 = createTestEntry(now = t2, messageType = "type.second") val e3 = createTestEntry(now = t3, messageType = "type.third") val e1 = createTestEntry(now = t1, messageType = "type.first") - transaction { + jdbc.withTransaction { store.persist(e2) store.persist(e3) store.persist(e1) } - val claimed = transaction { store.claimPending(10) } + val claimed = jdbc.withTransaction { store.claimPending(10) } claimed shouldHaveSize 3 claimed[0].messageType shouldBe "type.first" @@ -95,7 +97,7 @@ fun FunSpec.outboxStoreContractTests( } test("[$dbName] claimPending respects limit") { - transaction { + jdbc.withTransaction { repeat(5) { i -> val entry = createTestEntry( now = Instant.parse("2024-01-01T00:00:00Z").plusSeconds(i.toLong()), @@ -105,7 +107,7 @@ fun FunSpec.outboxStoreContractTests( } } - val claimed = transaction { store.claimPending(2) } + val claimed = jdbc.withTransaction { store.claimPending(2) } claimed shouldHaveSize 2 } @@ -120,19 +122,18 @@ fun FunSpec.outboxStoreContractTests( messageType = "type.delivered", ) - transaction { + jdbc.withTransaction { store.persist(pendingEntry) store.persist(toBeDelivered) } - // Claim the second entry and mark it delivered - transaction { + jdbc.withTransaction { val claimed = store.claimPending(10) val deliveredCandidate = claimed.first { it.outboxId == toBeDelivered.outboxId } store.updateAfterProcessing(deliveredCandidate.toDelivered(Instant.parse("2024-01-02T00:00:00Z"))) } - val claimed = transaction { store.claimPending(10) } + val claimed = jdbc.withTransaction { store.claimPending(10) } claimed shouldHaveSize 1 claimed.first().messageType shouldBe "type.pending" @@ -141,15 +142,15 @@ fun FunSpec.outboxStoreContractTests( test("[$dbName] updateAfterProcessing persists status change") { val entry = createTestEntry() - transaction { store.persist(entry) } + jdbc.withTransaction { store.persist(entry) } - transaction { + jdbc.withTransaction { val claimed = store.claimPending(10) val delivered = claimed.first().toDelivered(Instant.parse("2024-01-02T00:00:00Z")) store.updateAfterProcessing(delivered) } - val counts = transaction { store.countByStatuses() } + val counts = jdbc.withTransaction { store.countByStatuses() } counts shouldContain (OutboxStatus.DELIVERED to 1L) counts shouldContain (OutboxStatus.PENDING to 0L) @@ -165,13 +166,12 @@ fun FunSpec.outboxStoreContractTests( messageType = "type.recent", ) - transaction { + jdbc.withTransaction { store.persist(oldEntry) store.persist(recentEntry) } - // Mark both as delivered with different lastAttempt timestamps - transaction { + jdbc.withTransaction { val claimed = store.claimPending(10) val old = claimed.first { it.outboxId == oldEntry.outboxId } val recent = claimed.first { it.outboxId == recentEntry.outboxId } @@ -179,10 +179,9 @@ fun FunSpec.outboxStoreContractTests( store.updateAfterProcessing(recent.toDelivered(Instant.parse("2024-01-11T00:00:00Z"))) } - // Remove delivered before Jan 5 — should delete old (lastAttempt=Jan 2) but keep recent (lastAttempt=Jan 11) - transaction { store.removeDeliveredBefore(Instant.parse("2024-01-05T00:00:00Z"), limit = 100) } + jdbc.withTransaction { store.removeDeliveredBefore(Instant.parse("2024-01-05T00:00:00Z"), limit = 100) } - val counts = transaction { store.countByStatuses() } + val counts = jdbc.withTransaction { store.countByStatuses() } counts shouldContain (OutboxStatus.DELIVERED to 1L) } @@ -204,14 +203,14 @@ fun FunSpec.outboxStoreContractTests( messageType = "type.failed", ) - transaction { + jdbc.withTransaction { store.persist(pending1) store.persist(pending2) store.persist(toDeliver) store.persist(toFail) } - transaction { + jdbc.withTransaction { val claimed = store.claimPending(10) val deliverEntry = claimed.first { it.outboxId == toDeliver.outboxId } val failEntry = claimed.first { it.outboxId == toFail.outboxId } @@ -219,7 +218,7 @@ fun FunSpec.outboxStoreContractTests( store.updateAfterProcessing(failEntry.toFailed(Instant.parse("2024-01-02T00:00:00Z"), "some error")) } - val counts = transaction { store.countByStatuses() } + val counts = jdbc.withTransaction { store.countByStatuses() } counts shouldContain (OutboxStatus.PENDING to 2L) counts shouldContain (OutboxStatus.DELIVERED to 1L) @@ -227,7 +226,7 @@ fun FunSpec.outboxStoreContractTests( } test("[$dbName] claimPending returns empty when no PENDING entries") { - val claimed = transaction { store.claimPending(10) } + val claimed = jdbc.withTransaction { store.claimPending(10) } claimed shouldHaveSize 0 } diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/store/PostgresOutboxStoreTest.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/store/PostgresOutboxStoreTest.kt index 1c77bf1..0f2d051 100644 --- a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/store/PostgresOutboxStoreTest.kt +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/store/PostgresOutboxStoreTest.kt @@ -12,7 +12,13 @@ class PostgresOutboxStoreTest : FunSpec({ outboxStoreContractTests( dbName = "postgres", - storeFactory = { PostgresOutboxStore(Clock.fixed(Instant.parse("2024-01-01T00:00:00Z"), ZoneOffset.UTC)) }, + storeFactory = { + PostgresOutboxStore( + connectionProvider = db.jdbc, + clock = Clock.fixed(Instant.parse("2024-01-01T00:00:00Z"), ZoneOffset.UTC), + ) + }, + jdbcProvider = { db.jdbc }, startDb = { db.start() }, stopDb = { db.stop() }, truncate = { db.truncate() }, diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/JdbcConnectionProvider.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/JdbcConnectionProvider.kt new file mode 100644 index 0000000..eb7241c --- /dev/null +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/JdbcConnectionProvider.kt @@ -0,0 +1,38 @@ +package com.softwaremill.okapi.test.support + +import com.softwaremill.okapi.core.ConnectionProvider +import java.sql.Connection +import javax.sql.DataSource + +/** + * Test helper that provides a [ConnectionProvider] backed by a [ThreadLocal] connection. + * Use [withTransaction] to bind a JDBC connection for the duration of a block. + */ +class JdbcConnectionProvider(private val dataSource: DataSource) : ConnectionProvider { + private val threadLocalConnection = ThreadLocal() + + override fun getConnection(): Connection = threadLocalConnection.get() + ?: throw IllegalStateException("No connection bound to current thread. Use withTransaction { } in tests.") + + fun withTransaction(block: () -> T): T = withTransaction(transactionIsolation = null, block) + + fun withTransaction(transactionIsolation: Int?, block: () -> T): T { + val conn = dataSource.connection + conn.autoCommit = false + if (transactionIsolation != null) { + conn.transactionIsolation = transactionIsolation + } + threadLocalConnection.set(conn) + return try { + val result = block() + conn.commit() + result + } catch (e: Exception) { + conn.rollback() + throw e + } finally { + threadLocalConnection.remove() + conn.close() + } + } +} diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/MysqlTestSupport.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/MysqlTestSupport.kt index 70944d8..e883eeb 100644 --- a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/MysqlTestSupport.kt +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/MysqlTestSupport.kt @@ -1,25 +1,27 @@ package com.softwaremill.okapi.test.support +import com.mysql.cj.jdbc.MysqlDataSource import liquibase.Liquibase import liquibase.database.DatabaseFactory import liquibase.database.jvm.JdbcConnection import liquibase.resource.ClassLoaderResourceAccessor -import org.jetbrains.exposed.v1.jdbc.Database -import org.jetbrains.exposed.v1.jdbc.transactions.transaction import org.testcontainers.containers.MySQLContainer import java.sql.DriverManager +import javax.sql.DataSource class MysqlTestSupport { val container = MySQLContainer("mysql:8.0") + lateinit var dataSource: DataSource + lateinit var jdbc: JdbcConnectionProvider fun start() { container.start() - Database.connect( - url = container.jdbcUrl, - driver = container.driverClassName, - user = container.username, - password = container.password, - ) + dataSource = MysqlDataSource().apply { + setURL(container.jdbcUrl) + user = container.username + setPassword(container.password) + } + jdbc = JdbcConnectionProvider(dataSource) runLiquibase() } @@ -28,7 +30,9 @@ class MysqlTestSupport { } fun truncate() { - transaction { exec("DELETE FROM outbox") } + jdbc.withTransaction { + jdbc.getConnection().createStatement().use { it.execute("DELETE FROM outbox") } + } } private fun runLiquibase() { diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/PostgresTestSupport.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/PostgresTestSupport.kt index 3a54934..aab6a75 100644 --- a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/PostgresTestSupport.kt +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/PostgresTestSupport.kt @@ -4,22 +4,24 @@ import liquibase.Liquibase import liquibase.database.DatabaseFactory import liquibase.database.jvm.JdbcConnection import liquibase.resource.ClassLoaderResourceAccessor -import org.jetbrains.exposed.v1.jdbc.Database -import org.jetbrains.exposed.v1.jdbc.transactions.transaction +import org.postgresql.ds.PGSimpleDataSource import org.testcontainers.containers.PostgreSQLContainer import java.sql.DriverManager +import javax.sql.DataSource class PostgresTestSupport { val container = PostgreSQLContainer("postgres:16") + lateinit var dataSource: DataSource + lateinit var jdbc: JdbcConnectionProvider fun start() { container.start() - Database.connect( - url = container.jdbcUrl, - driver = container.driverClassName, - user = container.username, - password = container.password, - ) + dataSource = PGSimpleDataSource().apply { + setURL(container.jdbcUrl) + user = container.username + password = container.password + } + jdbc = JdbcConnectionProvider(dataSource) runLiquibase() } @@ -28,7 +30,9 @@ class PostgresTestSupport { } fun truncate() { - transaction { exec("TRUNCATE TABLE outbox") } + jdbc.withTransaction { + jdbc.getConnection().createStatement().use { it.execute("TRUNCATE TABLE outbox") } + } } private fun runLiquibase() { diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transaction/MultiDataSourceTransactionTest.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transaction/MultiDataSourceTransactionTest.kt index 4f4fff6..aa75755 100644 --- a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transaction/MultiDataSourceTransactionTest.kt +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transaction/MultiDataSourceTransactionTest.kt @@ -5,6 +5,7 @@ import com.softwaremill.okapi.core.OutboxMessage import com.softwaremill.okapi.core.OutboxPublisher import com.softwaremill.okapi.core.OutboxStatus import com.softwaremill.okapi.postgres.PostgresOutboxStore +import com.softwaremill.okapi.springboot.SpringConnectionProvider import com.softwaremill.okapi.springboot.SpringOutboxPublisher import io.kotest.assertions.throwables.shouldThrow import io.kotest.core.spec.style.FunSpec @@ -14,7 +15,6 @@ import liquibase.Liquibase import liquibase.database.DatabaseFactory import liquibase.database.jvm.JdbcConnection import liquibase.resource.ClassLoaderResourceAccessor -import org.jetbrains.exposed.v1.spring7.transaction.SpringTransactionManager import org.postgresql.ds.PGSimpleDataSource import org.springframework.jdbc.datasource.DataSourceTransactionManager import org.springframework.transaction.support.TransactionTemplate @@ -31,8 +31,7 @@ import javax.sql.DataSource * - **outboxContainer**: hosts the outbox table (Liquibase migration applied) * - **otherContainer**: a second database with no outbox table * - * The outbox DataSource uses [SpringTransactionManager] (Exposed-compatible), - * while the other DataSource uses plain [DataSourceTransactionManager]. + * Both DataSources use plain [DataSourceTransactionManager]. */ class MultiDataSourceTransactionTest : FunSpec({ @@ -74,16 +73,14 @@ class MultiDataSourceTransactionTest : FunSpec({ // Run Liquibase migration only on the outbox database runLiquibase(outboxContainer) - // SpringTransactionManager (Exposed) for the outbox DataSource — - // PostgresOutboxStore uses Exposed internally, so the transaction must be Exposed-compatible - val outboxTxManager = SpringTransactionManager(outboxDataSource) + val outboxTxManager = DataSourceTransactionManager(outboxDataSource) outboxTxTemplate = TransactionTemplate(outboxTxManager) // Plain DataSourceTransactionManager for the other DataSource val otherTxManager = DataSourceTransactionManager(otherDataSource) otherTxTemplate = TransactionTemplate(otherTxManager) - store = PostgresOutboxStore(clock) + store = PostgresOutboxStore(SpringConnectionProvider(outboxDataSource), clock) val corePublisher = OutboxPublisher(store, clock) publisher = SpringOutboxPublisher(delegate = corePublisher, dataSource = outboxDataSource) } diff --git a/okapi-mysql/build.gradle.kts b/okapi-mysql/build.gradle.kts index 1ba1d75..dd671e5 100644 --- a/okapi-mysql/build.gradle.kts +++ b/okapi-mysql/build.gradle.kts @@ -3,27 +3,15 @@ plugins { id("buildsrc.convention.publish") } -description = "MySQL outbox store using Exposed" +description = "MySQL outbox store using plain JDBC" dependencies { implementation(project(":okapi-core")) - compileOnly(libs.exposedCore) - compileOnly(libs.exposedJdbc) - compileOnly(libs.exposedJson) - compileOnly(libs.exposedJavaTime) - - implementation(libs.jacksonModuleKotlin) - implementation(libs.jacksonDatatypeJsr310) - compileOnly(libs.liquibaseCore) testImplementation(libs.kotestRunnerJunit5) testImplementation(libs.kotestAssertionsCore) testImplementation(libs.testcontainersMysql) testImplementation(libs.mysql) - testImplementation(libs.exposedCore) - testImplementation(libs.exposedJdbc) - testImplementation(libs.exposedJson) - testImplementation(libs.exposedJavaTime) } diff --git a/okapi-mysql/src/main/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStore.kt b/okapi-mysql/src/main/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStore.kt index 424f938..fd2e824 100644 --- a/okapi-mysql/src/main/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStore.kt +++ b/okapi-mysql/src/main/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStore.kt @@ -1,39 +1,55 @@ package com.softwaremill.okapi.mysql +import com.softwaremill.okapi.core.ConnectionProvider import com.softwaremill.okapi.core.OutboxEntry import com.softwaremill.okapi.core.OutboxId import com.softwaremill.okapi.core.OutboxStatus import com.softwaremill.okapi.core.OutboxStore -import org.jetbrains.exposed.v1.core.IntegerColumnType -import org.jetbrains.exposed.v1.core.alias -import org.jetbrains.exposed.v1.core.count -import org.jetbrains.exposed.v1.core.inList -import org.jetbrains.exposed.v1.core.min -import org.jetbrains.exposed.v1.jdbc.select -import org.jetbrains.exposed.v1.jdbc.transactions.TransactionManager -import org.jetbrains.exposed.v1.jdbc.upsert import java.sql.ResultSet +import java.sql.Timestamp import java.time.Clock import java.time.Instant import java.util.UUID -/** MySQL [OutboxStore] implementation using Exposed. */ +/** MySQL [OutboxStore] implementation using plain JDBC. */ class MysqlOutboxStore( - private val clock: Clock, + private val connectionProvider: ConnectionProvider, + private val clock: Clock = Clock.systemUTC(), ) : OutboxStore { + override fun persist(entry: OutboxEntry): OutboxEntry { - OutboxTable.upsert { - it[id] = entry.outboxId - it[messageType] = entry.messageType - it[payload] = entry.payload - it[deliveryType] = entry.deliveryType - it[status] = entry.status.name - it[createdAt] = entry.createdAt - it[updatedAt] = entry.updatedAt - it[retries] = entry.retries - it[lastAttempt] = entry.lastAttempt - it[lastError] = entry.lastError - it[deliveryMetadata] = entry.deliveryMetadata + val sql = """ + INSERT INTO outbox (id, message_type, payload, delivery_type, status, created_at, updated_at, retries, last_attempt, last_error, delivery_metadata) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON DUPLICATE KEY UPDATE + status = VALUES(status), + updated_at = VALUES(updated_at), + retries = VALUES(retries), + last_attempt = VALUES(last_attempt), + last_error = VALUES(last_error), + delivery_metadata = VALUES(delivery_metadata) + """.trimIndent() + + connectionProvider.getConnection().prepareStatement(sql).use { stmt -> + stmt.setString(1, entry.outboxId.raw.toString()) + stmt.setString(2, entry.messageType) + stmt.setString(3, entry.payload) + stmt.setString(4, entry.deliveryType) + stmt.setString(5, entry.status.name) + stmt.setTimestamp(6, Timestamp.from(entry.createdAt)) + stmt.setTimestamp(7, Timestamp.from(entry.updatedAt)) + stmt.setInt(8, entry.retries) + if (entry.lastAttempt != null) { + stmt.setTimestamp( + 9, + Timestamp.from(entry.lastAttempt), + ) + } else { + stmt.setNull(9, java.sql.Types.TIMESTAMP) + } + if (entry.lastError != null) stmt.setString(10, entry.lastError) else stmt.setNull(10, java.sql.Types.VARCHAR) + stmt.setString(11, entry.deliveryMetadata) + stmt.executeUpdate() } return entry } @@ -42,81 +58,90 @@ class MysqlOutboxStore( // FORCE INDEX ensures InnoDB walks the (status, created_at) index so // that FOR UPDATE SKIP LOCKED only row-locks the rows actually returned // by LIMIT, rather than every row matching the WHERE clause. - val nativeQuery = - "SELECT * FROM ${OutboxTable.tableName}" + - " FORCE INDEX (idx_outbox_status_created_at)" + - " WHERE ${OutboxTable.status.name} = '${OutboxStatus.PENDING}'" + - " ORDER BY ${OutboxTable.createdAt.name} ASC" + - " LIMIT $limit FOR UPDATE SKIP LOCKED" + val sql = """ + SELECT * FROM outbox + FORCE INDEX (idx_outbox_status_created_at) + WHERE status = ? + ORDER BY created_at ASC + LIMIT ? + FOR UPDATE SKIP LOCKED + """.trimIndent() - return TransactionManager.current().exec(nativeQuery) { rs -> - generateSequence { - if (rs.next()) mapFromResultSet(rs) else null - }.toList() - } ?: emptyList() + return connectionProvider.getConnection().prepareStatement(sql).use { stmt -> + stmt.setString(1, OutboxStatus.PENDING.name) + stmt.setInt(2, limit) + stmt.executeQuery().use { rs -> + generateSequence { if (rs.next()) rs.toOutboxEntry() else null }.toList() + } + } } override fun updateAfterProcessing(entry: OutboxEntry): OutboxEntry = persist(entry) override fun removeDeliveredBefore(time: Instant, limit: Int): Int { val sql = """ - DELETE FROM ${OutboxTable.tableName} WHERE ${OutboxTable.id.name} IN ( - SELECT ${OutboxTable.id.name} FROM ( - SELECT ${OutboxTable.id.name} FROM ${OutboxTable.tableName} - WHERE ${OutboxTable.status.name} = '${OutboxStatus.DELIVERED}' - AND ${OutboxTable.lastAttempt.name} < ? - ORDER BY ${OutboxTable.id.name} + DELETE FROM outbox WHERE id IN ( + SELECT id FROM ( + SELECT id FROM outbox + WHERE status = ? + AND last_attempt < ? + ORDER BY id LIMIT ? FOR UPDATE SKIP LOCKED ) AS batch ) """.trimIndent() - val statement = TransactionManager.current().connection.prepareStatement(sql, false) - statement.fillParameters( - listOf( - OutboxTable.lastAttempt.columnType to time, - IntegerColumnType() to limit, - ), - ) - return statement.executeUpdate() + return connectionProvider.getConnection().prepareStatement(sql).use { stmt -> + stmt.setString(1, OutboxStatus.DELIVERED.name) + stmt.setTimestamp(2, Timestamp.from(time)) + stmt.setInt(3, limit) + stmt.executeUpdate() + } } override fun findOldestCreatedAt(statuses: Set): Map { val result = statuses.associateWith { clock.instant() }.toMutableMap() - val minAlias = OutboxTable.createdAt.min().alias("min_created_at") - OutboxTable - .select(OutboxTable.status, minAlias) - .where { OutboxTable.status inList statuses.map { status -> status.name } } - .groupBy(OutboxTable.status) - .forEach { row -> - val s = OutboxStatus.from(row[OutboxTable.status]) - result[s] = requireNotNull(row[minAlias]) + val placeholders = statuses.joinToString(",") { "?" } + val sql = "SELECT status, MIN(created_at) AS min_created_at FROM outbox WHERE status IN ($placeholders) GROUP BY status" + + connectionProvider.getConnection().prepareStatement(sql).use { stmt -> + statuses.forEachIndexed { i, s -> stmt.setString(i + 1, s.name) } + stmt.executeQuery().use { rs -> + while (rs.next()) { + val s = OutboxStatus.from(rs.getString("status")) + result[s] = rs.getTimestamp("min_created_at").toInstant() + } } + } return result } override fun countByStatuses(): Map { - val countAlias = OutboxTable.status.count().alias("count") - val counts = - OutboxTable - .select(OutboxTable.status, countAlias) - .groupBy(OutboxTable.status) - .associate { row -> OutboxStatus.from(row[OutboxTable.status]) to row[countAlias] } - return OutboxStatus.entries.associateWith { status -> counts[status] ?: 0L } + val sql = "SELECT status, COUNT(*) AS count FROM outbox GROUP BY status" + val counts = mutableMapOf() + + connectionProvider.getConnection().prepareStatement(sql).use { stmt -> + stmt.executeQuery().use { rs -> + while (rs.next()) { + counts[OutboxStatus.from(rs.getString("status"))] = rs.getLong("count") + } + } + } + return OutboxStatus.entries.associateWith { counts[it] ?: 0L } } - private fun mapFromResultSet(rs: ResultSet): OutboxEntry = OutboxEntry( - outboxId = OutboxId(UUID.fromString(rs.getString("id"))), - messageType = rs.getString("message_type"), - payload = rs.getString("payload"), - deliveryType = rs.getString("delivery_type"), - status = OutboxStatus.from(rs.getString("status")), - createdAt = rs.getTimestamp("created_at").toInstant(), - updatedAt = rs.getTimestamp("updated_at").toInstant(), - retries = rs.getInt("retries"), - lastAttempt = rs.getTimestamp("last_attempt")?.toInstant(), - lastError = rs.getString("last_error"), - deliveryMetadata = rs.getString("delivery_metadata"), + private fun ResultSet.toOutboxEntry(): OutboxEntry = OutboxEntry( + outboxId = OutboxId(UUID.fromString(getString("id"))), + messageType = getString("message_type"), + payload = getString("payload"), + deliveryType = getString("delivery_type"), + status = OutboxStatus.from(getString("status")), + createdAt = getTimestamp("created_at").toInstant(), + updatedAt = getTimestamp("updated_at").toInstant(), + retries = getInt("retries"), + lastAttempt = getTimestamp("last_attempt")?.toInstant(), + lastError = getString("last_error"), + deliveryMetadata = getString("delivery_metadata"), ) } diff --git a/okapi-mysql/src/main/kotlin/com/softwaremill/okapi/mysql/OutboxTable.kt b/okapi-mysql/src/main/kotlin/com/softwaremill/okapi/mysql/OutboxTable.kt deleted file mode 100644 index a525706..0000000 --- a/okapi-mysql/src/main/kotlin/com/softwaremill/okapi/mysql/OutboxTable.kt +++ /dev/null @@ -1,31 +0,0 @@ -package com.softwaremill.okapi.mysql - -import com.softwaremill.okapi.core.OutboxId -import com.softwaremill.okapi.core.OutboxStatus -import org.jetbrains.exposed.v1.core.Table -import org.jetbrains.exposed.v1.javatime.timestamp -import org.jetbrains.exposed.v1.json.json -import java.util.UUID - -internal object OutboxTable : Table("outbox") { - val id = varchar("id", 36).transform( - wrap = { str -> OutboxId(UUID.fromString(str)) }, - unwrap = { outboxId -> outboxId.raw.toString() }, - ) - val messageType = varchar("message_type", 255) - val payload = text("payload") - val deliveryType = varchar("delivery_type", 50) - val status = varchar("status", 50).default(OutboxStatus.PENDING.name) - val createdAt = timestamp("created_at") - val updatedAt = timestamp("updated_at") - val retries = integer("retries").default(0) - val lastAttempt = timestamp("last_attempt").nullable() - val lastError = text("last_error").nullable() - val deliveryMetadata = json("delivery_metadata", { it }, { it }) - - override val primaryKey = PrimaryKey(id) - - init { - index("idx_outbox_status_created_at", isUnique = false, status, createdAt) - } -} diff --git a/okapi-mysql/src/test/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStoreTest.kt b/okapi-mysql/src/test/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStoreTest.kt deleted file mode 100644 index cbaa5c3..0000000 --- a/okapi-mysql/src/test/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStoreTest.kt +++ /dev/null @@ -1,147 +0,0 @@ -package com.softwaremill.okapi.mysql - -import com.softwaremill.okapi.core.OutboxEntry -import com.softwaremill.okapi.core.OutboxId -import com.softwaremill.okapi.core.OutboxStatus -import io.kotest.core.spec.style.BehaviorSpec -import io.kotest.matchers.collections.shouldHaveSize -import io.kotest.matchers.shouldBe -import org.jetbrains.exposed.v1.jdbc.Database -import org.jetbrains.exposed.v1.jdbc.SchemaUtils -import org.jetbrains.exposed.v1.jdbc.transactions.transaction -import org.testcontainers.containers.MySQLContainer -import java.time.Clock -import java.time.Instant - -class MysqlOutboxStoreTest : BehaviorSpec({ - val mysql = MySQLContainer("mysql:8.0").apply { start() } - - val db = Database.connect( - url = mysql.jdbcUrl, - driver = mysql.driverClassName, - user = mysql.username, - password = mysql.password, - ) - - val clock = Clock.systemUTC() - val store = MysqlOutboxStore(clock) - - beforeSpec { - transaction(db) { - SchemaUtils.create(OutboxTable) - } - } - - afterSpec { - mysql.stop() - } - - given("persist and claimPending") { - `when`("an entry is persisted") { - val entry = newEntry() - transaction(db) { store.persist(entry) } - - then("claimPending returns it") { - val claimed = transaction(db) { store.claimPending(10) } - claimed shouldHaveSize 1 - claimed.first().outboxId shouldBe entry.outboxId - claimed.first().status shouldBe OutboxStatus.PENDING - } - } - } - - given("updateAfterProcessing") { - `when`("entry is marked DELIVERED") { - val entry = newEntry() - transaction(db) { store.persist(entry) } - - val delivered = entry.copy(status = OutboxStatus.DELIVERED, lastAttempt = Instant.now(clock)) - transaction(db) { store.updateAfterProcessing(delivered) } - - then("claimPending no longer returns it") { - val claimed = transaction(db) { store.claimPending(10) } - claimed.none { it.outboxId == entry.outboxId } shouldBe true - } - } - } - - given("removeDeliveredBefore") { - `when`("called with a cutoff time") { - transaction(db) { exec("DELETE FROM outbox") } - val entry = newEntry() - val delivered = entry.copy( - status = OutboxStatus.DELIVERED, - lastAttempt = Instant.parse("2020-01-01T00:00:00Z"), - ) - transaction(db) { store.persist(delivered) } - transaction(db) { store.removeDeliveredBefore(Instant.parse("2025-01-01T00:00:00Z"), Int.MAX_VALUE) } - - then("old delivered entries are removed") { - val counts = transaction(db) { store.countByStatuses() } - counts[OutboxStatus.DELIVERED] shouldBe 0L - } - } - } - - given("removeDeliveredBefore with limit") { - `when`("limit is smaller than matching entries") { - transaction(db) { exec("DELETE FROM outbox") } - repeat(5) { - val entry = newEntry().copy( - status = OutboxStatus.DELIVERED, - lastAttempt = Instant.parse("2020-01-01T00:00:00Z"), - ) - transaction(db) { store.persist(entry) } - } - - val deleted = transaction(db) { - store.removeDeliveredBefore(Instant.parse("2025-01-01T00:00:00Z"), 3) - } - - then("only deletes up to limit") { - deleted shouldBe 3 - } - then("remaining entries still exist") { - val counts = transaction(db) { store.countByStatuses() } - counts[OutboxStatus.DELIVERED] shouldBe 2L - } - } - } - - given("countByStatuses") { - `when`("entries exist with different statuses") { - transaction(db) { - exec("DELETE FROM outbox") - store.persist(newEntry().copy(status = OutboxStatus.PENDING)) - store.persist(newEntry().copy(status = OutboxStatus.PENDING)) - store.persist( - newEntry().copy( - status = OutboxStatus.DELIVERED, - lastAttempt = Instant.now(clock), - ), - ) - } - - then("returns correct counts per status") { - val counts = transaction(db) { store.countByStatuses() } - counts[OutboxStatus.PENDING] shouldBe 2L - counts[OutboxStatus.DELIVERED] shouldBe 1L - counts[OutboxStatus.FAILED] shouldBe 0L - } - } - } -}) - -private fun newEntry() = OutboxEntry( - outboxId = OutboxId.new(), - messageType = "test.event", - payload = """{"key": "value"}""", - deliveryType = "http", - status = OutboxStatus.PENDING, - createdAt = Instant.now(), - updatedAt = Instant.now(), - retries = 0, - lastAttempt = null, - lastError = null, - deliveryMetadata = """{"type": "http", "url": "http://localhost"}""", -) diff --git a/okapi-postgres/build.gradle.kts b/okapi-postgres/build.gradle.kts index ac12d31..6185570 100644 --- a/okapi-postgres/build.gradle.kts +++ b/okapi-postgres/build.gradle.kts @@ -3,27 +3,15 @@ plugins { id("buildsrc.convention.publish") } -description = "PostgreSQL outbox store using Exposed" +description = "PostgreSQL outbox store using plain JDBC" dependencies { implementation(project(":okapi-core")) - compileOnly(libs.exposedCore) - compileOnly(libs.exposedJdbc) - compileOnly(libs.exposedJson) - compileOnly(libs.exposedJavaTime) - - implementation(libs.jacksonModuleKotlin) - implementation(libs.jacksonDatatypeJsr310) - compileOnly(libs.liquibaseCore) testImplementation(libs.kotestRunnerJunit5) testImplementation(libs.kotestAssertionsCore) testImplementation(libs.testcontainersPostgresql) testImplementation(libs.postgresql) - testImplementation(libs.exposedCore) - testImplementation(libs.exposedJdbc) - testImplementation(libs.exposedJson) - testImplementation(libs.exposedJavaTime) } diff --git a/okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/OutboxTable.kt b/okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/OutboxTable.kt deleted file mode 100644 index 4aeedc0..0000000 --- a/okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/OutboxTable.kt +++ /dev/null @@ -1,24 +0,0 @@ -package com.softwaremill.okapi.postgres - -import com.softwaremill.okapi.core.OutboxId -import com.softwaremill.okapi.core.OutboxStatus -import org.jetbrains.exposed.v1.core.Table -import org.jetbrains.exposed.v1.core.java.javaUUID -import org.jetbrains.exposed.v1.javatime.timestamp -import org.jetbrains.exposed.v1.json.json - -internal object OutboxTable : Table("outbox") { - val id = javaUUID("id").transform(wrap = { uuid -> OutboxId(uuid) }, unwrap = { outboxId -> outboxId.raw }) - val messageType = varchar("message_type", 255) - val payload = text("payload") - val deliveryType = varchar("delivery_type", 50) - val status = varchar("status", 50).default(OutboxStatus.PENDING.name) - val createdAt = timestamp("created_at") - val updatedAt = timestamp("updated_at") - val retries = integer("retries").default(0) - val lastAttempt = timestamp("last_attempt").nullable() - val lastError = text("last_error").nullable() - val deliveryMetadata = json("delivery_metadata", { it }, { it }) - - override val primaryKey = PrimaryKey(id) -} diff --git a/okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/PostgresOutboxStore.kt b/okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/PostgresOutboxStore.kt index 68fa2a6..40d7ad2 100644 --- a/okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/PostgresOutboxStore.kt +++ b/okapi-postgres/src/main/kotlin/com/softwaremill/okapi/postgres/PostgresOutboxStore.kt @@ -1,113 +1,141 @@ package com.softwaremill.okapi.postgres +import com.softwaremill.okapi.core.ConnectionProvider import com.softwaremill.okapi.core.OutboxEntry +import com.softwaremill.okapi.core.OutboxId import com.softwaremill.okapi.core.OutboxStatus import com.softwaremill.okapi.core.OutboxStore -import org.jetbrains.exposed.v1.core.IntegerColumnType -import org.jetbrains.exposed.v1.core.ResultRow -import org.jetbrains.exposed.v1.core.SortOrder -import org.jetbrains.exposed.v1.core.alias -import org.jetbrains.exposed.v1.core.count -import org.jetbrains.exposed.v1.core.eq -import org.jetbrains.exposed.v1.core.inList -import org.jetbrains.exposed.v1.core.min -import org.jetbrains.exposed.v1.core.vendors.ForUpdateOption -import org.jetbrains.exposed.v1.jdbc.select -import org.jetbrains.exposed.v1.jdbc.transactions.TransactionManager -import org.jetbrains.exposed.v1.jdbc.upsert +import java.sql.ResultSet +import java.sql.Timestamp import java.time.Clock import java.time.Instant +import java.util.UUID -/** PostgreSQL [OutboxStore] implementation using Exposed. */ +/** PostgreSQL [OutboxStore] implementation using plain JDBC. */ class PostgresOutboxStore( - private val clock: Clock, + private val connectionProvider: ConnectionProvider, + private val clock: Clock = Clock.systemUTC(), ) : OutboxStore { + override fun persist(entry: OutboxEntry): OutboxEntry { - OutboxTable.upsert { - it[id] = entry.outboxId - it[messageType] = entry.messageType - it[payload] = entry.payload - it[deliveryType] = entry.deliveryType - it[status] = entry.status.name - it[createdAt] = entry.createdAt - it[updatedAt] = entry.updatedAt - it[retries] = entry.retries - it[lastAttempt] = entry.lastAttempt - it[lastError] = entry.lastError - it[deliveryMetadata] = entry.deliveryMetadata + val sql = """ + INSERT INTO outbox (id, message_type, payload, delivery_type, status, created_at, updated_at, retries, last_attempt, last_error, delivery_metadata) + VALUES (?::uuid, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb) + ON CONFLICT (id) DO UPDATE SET + status = EXCLUDED.status, + updated_at = EXCLUDED.updated_at, + retries = EXCLUDED.retries, + last_attempt = EXCLUDED.last_attempt, + last_error = EXCLUDED.last_error, + delivery_metadata = EXCLUDED.delivery_metadata + """.trimIndent() + + connectionProvider.getConnection().prepareStatement(sql).use { stmt -> + stmt.setString(1, entry.outboxId.raw.toString()) + stmt.setString(2, entry.messageType) + stmt.setString(3, entry.payload) + stmt.setString(4, entry.deliveryType) + stmt.setString(5, entry.status.name) + stmt.setTimestamp(6, Timestamp.from(entry.createdAt)) + stmt.setTimestamp(7, Timestamp.from(entry.updatedAt)) + stmt.setInt(8, entry.retries) + if (entry.lastAttempt != null) { + stmt.setTimestamp( + 9, + Timestamp.from(entry.lastAttempt), + ) + } else { + stmt.setNull(9, java.sql.Types.TIMESTAMP) + } + if (entry.lastError != null) stmt.setString(10, entry.lastError) else stmt.setNull(10, java.sql.Types.VARCHAR) + stmt.setString(11, entry.deliveryMetadata) + stmt.executeUpdate() } return entry } override fun claimPending(limit: Int): List { - return OutboxTable - .select(OutboxTable.columns) - .where { OutboxTable.status eq OutboxStatus.PENDING.name } - .orderBy(OutboxTable.createdAt to SortOrder.ASC) - .limit(limit) - .forUpdate(ForUpdateOption.PostgreSQL.ForUpdate(mode = ForUpdateOption.PostgreSQL.MODE.SKIP_LOCKED)) - .map { it.toOutboxEntry() } + val sql = """ + SELECT * FROM outbox + WHERE status = ? + ORDER BY created_at ASC + LIMIT ? + FOR UPDATE SKIP LOCKED + """.trimIndent() + + return connectionProvider.getConnection().prepareStatement(sql).use { stmt -> + stmt.setString(1, OutboxStatus.PENDING.name) + stmt.setInt(2, limit) + stmt.executeQuery().use { rs -> + generateSequence { if (rs.next()) rs.toOutboxEntry() else null }.toList() + } + } } override fun updateAfterProcessing(entry: OutboxEntry): OutboxEntry = persist(entry) override fun removeDeliveredBefore(time: Instant, limit: Int): Int { val sql = """ - DELETE FROM ${OutboxTable.tableName} WHERE ${OutboxTable.id.name} IN ( - SELECT ${OutboxTable.id.name} FROM ${OutboxTable.tableName} - WHERE ${OutboxTable.status.name} = '${OutboxStatus.DELIVERED}' - AND ${OutboxTable.lastAttempt.name} < ? - ORDER BY ${OutboxTable.id.name} + DELETE FROM outbox WHERE id IN ( + SELECT id FROM outbox + WHERE status = ? + AND last_attempt < ? + ORDER BY id LIMIT ? FOR UPDATE SKIP LOCKED ) """.trimIndent() - val statement = TransactionManager.current().connection.prepareStatement(sql, false) - statement.fillParameters( - listOf( - OutboxTable.lastAttempt.columnType to time, - IntegerColumnType() to limit, - ), - ) - return statement.executeUpdate() + return connectionProvider.getConnection().prepareStatement(sql).use { stmt -> + stmt.setString(1, OutboxStatus.DELIVERED.name) + stmt.setTimestamp(2, Timestamp.from(time)) + stmt.setInt(3, limit) + stmt.executeUpdate() + } } override fun findOldestCreatedAt(statuses: Set): Map { val result = statuses.associateWith { clock.instant() }.toMutableMap() - val minAlias = OutboxTable.createdAt.min().alias("min_created_at") - OutboxTable - .select(OutboxTable.status, minAlias) - .where { OutboxTable.status inList statuses.map { status -> status.name } } - .groupBy(OutboxTable.status) - .forEach { row -> - val s = OutboxStatus.from(row[OutboxTable.status]) - result[s] = requireNotNull(row[minAlias]) + val placeholders = statuses.joinToString(",") { "?" } + val sql = "SELECT status, MIN(created_at) AS min_created_at FROM outbox WHERE status IN ($placeholders) GROUP BY status" + + connectionProvider.getConnection().prepareStatement(sql).use { stmt -> + statuses.forEachIndexed { i, s -> stmt.setString(i + 1, s.name) } + stmt.executeQuery().use { rs -> + while (rs.next()) { + val s = OutboxStatus.from(rs.getString("status")) + result[s] = rs.getTimestamp("min_created_at").toInstant() + } } + } return result } override fun countByStatuses(): Map { - val countAlias = OutboxTable.status.count().alias("count") - val counts = - OutboxTable - .select(OutboxTable.status, countAlias) - .groupBy(OutboxTable.status) - .associate { row -> OutboxStatus.from(row[OutboxTable.status]) to row[countAlias] } - return OutboxStatus.entries.associateWith { status -> counts[status] ?: 0L } + val sql = "SELECT status, COUNT(*) AS count FROM outbox GROUP BY status" + val counts = mutableMapOf() + + connectionProvider.getConnection().prepareStatement(sql).use { stmt -> + stmt.executeQuery().use { rs -> + while (rs.next()) { + counts[OutboxStatus.from(rs.getString("status"))] = rs.getLong("count") + } + } + } + return OutboxStatus.entries.associateWith { counts[it] ?: 0L } } - private fun ResultRow.toOutboxEntry(): OutboxEntry = OutboxEntry( - outboxId = this[OutboxTable.id], - messageType = this[OutboxTable.messageType], - payload = this[OutboxTable.payload], - deliveryType = this[OutboxTable.deliveryType], - status = OutboxStatus.from(this[OutboxTable.status]), - createdAt = this[OutboxTable.createdAt], - updatedAt = this[OutboxTable.updatedAt], - retries = this[OutboxTable.retries], - lastAttempt = this[OutboxTable.lastAttempt], - lastError = this[OutboxTable.lastError], - deliveryMetadata = this[OutboxTable.deliveryMetadata], + private fun ResultSet.toOutboxEntry(): OutboxEntry = OutboxEntry( + outboxId = OutboxId(UUID.fromString(getString("id"))), + messageType = getString("message_type"), + payload = getString("payload"), + deliveryType = getString("delivery_type"), + status = OutboxStatus.from(getString("status")), + createdAt = getTimestamp("created_at").toInstant(), + updatedAt = getTimestamp("updated_at").toInstant(), + retries = getInt("retries"), + lastAttempt = getTimestamp("last_attempt")?.toInstant(), + lastError = getString("last_error"), + deliveryMetadata = getString("delivery_metadata"), ) } diff --git a/okapi-spring-boot/build.gradle.kts b/okapi-spring-boot/build.gradle.kts index 5512268..e9b41fe 100644 --- a/okapi-spring-boot/build.gradle.kts +++ b/okapi-spring-boot/build.gradle.kts @@ -10,6 +10,7 @@ dependencies { compileOnly(libs.springContext) compileOnly(libs.springTx) + implementation(libs.springJdbc) compileOnly(libs.springBootAutoconfigure) // Validation annotations for @ConfigurationProperties classes @@ -31,10 +32,6 @@ dependencies { testImplementation(project(":okapi-postgres")) testImplementation(project(":okapi-mysql")) testImplementation(project(":okapi-http")) - testImplementation(libs.exposedCore) - testImplementation(libs.exposedJdbc) - testImplementation(libs.exposedJson) - testImplementation(libs.exposedJavaTime) testImplementation(libs.liquibaseCore) testImplementation(libs.testcontainersPostgresql) testImplementation(libs.postgresql) diff --git a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt index 53268d8..272c83c 100644 --- a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt +++ b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt @@ -118,10 +118,12 @@ class OutboxAutoConfiguration( fun outboxPurgerScheduler( props: OutboxPurgerProperties, outboxStore: OutboxStore, + transactionManager: ObjectProvider, clock: ObjectProvider, ): OutboxPurgerScheduler { return OutboxPurgerScheduler( outboxStore = outboxStore, + transactionTemplate = transactionManager.getIfAvailable()?.let { TransactionTemplate(it) }, config = OutboxPurgerConfig( retention = props.retention, interval = props.interval, @@ -145,8 +147,10 @@ class OutboxAutoConfiguration( ) { @Bean @ConditionalOnMissingBean(OutboxStore::class) - fun outboxStore(clock: ObjectProvider): PostgresOutboxStore = - PostgresOutboxStore(clock = clock.getIfAvailable { Clock.systemUTC() }) + fun outboxStore(clock: ObjectProvider): PostgresOutboxStore = PostgresOutboxStore( + connectionProvider = SpringConnectionProvider(resolveDataSource(dataSources, primaryDataSource, okapiProperties)), + clock = clock.getIfAvailable { Clock.systemUTC() }, + ) @Bean("okapiPostgresLiquibase") @ConditionalOnClass(SpringLiquibase::class) @@ -168,8 +172,10 @@ class OutboxAutoConfiguration( ) { @Bean @ConditionalOnMissingBean(OutboxStore::class) - fun outboxStore(clock: ObjectProvider): MysqlOutboxStore = - MysqlOutboxStore(clock = clock.getIfAvailable { Clock.systemUTC() }) + fun outboxStore(clock: ObjectProvider): MysqlOutboxStore = MysqlOutboxStore( + connectionProvider = SpringConnectionProvider(resolveDataSource(dataSources, primaryDataSource, okapiProperties)), + clock = clock.getIfAvailable { Clock.systemUTC() }, + ) @Bean("okapiMysqlLiquibase") @ConditionalOnClass(SpringLiquibase::class) diff --git a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxPurgerScheduler.kt b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxPurgerScheduler.kt index bd62b4f..1251ed3 100644 --- a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxPurgerScheduler.kt +++ b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxPurgerScheduler.kt @@ -4,6 +4,7 @@ import com.softwaremill.okapi.core.OutboxPurger import com.softwaremill.okapi.core.OutboxPurgerConfig import com.softwaremill.okapi.core.OutboxStore import org.springframework.context.SmartLifecycle +import org.springframework.transaction.support.TransactionTemplate import java.time.Clock /** @@ -14,12 +15,14 @@ import java.time.Clock */ class OutboxPurgerScheduler( outboxStore: OutboxStore, + transactionTemplate: TransactionTemplate? = null, config: OutboxPurgerConfig = OutboxPurgerConfig(), clock: Clock = Clock.systemUTC(), ) : SmartLifecycle { private val purger = OutboxPurger( outboxStore = outboxStore, + transactionRunner = transactionTemplate?.let { SpringTransactionRunner(it) }, config = config, clock = clock, ) diff --git a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/SpringConnectionProvider.kt b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/SpringConnectionProvider.kt new file mode 100644 index 0000000..6e921dc --- /dev/null +++ b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/SpringConnectionProvider.kt @@ -0,0 +1,18 @@ +package com.softwaremill.okapi.springboot + +import com.softwaremill.okapi.core.ConnectionProvider +import org.springframework.jdbc.datasource.DataSourceUtils +import java.sql.Connection +import javax.sql.DataSource + +/** + * Spring-aware [ConnectionProvider] that retrieves the JDBC connection + * bound to the current Spring-managed transaction. + * + * Uses [DataSourceUtils.getConnection] — the standard Spring mechanism + * that works transparently with any [org.springframework.transaction.PlatformTransactionManager]: + * JPA, JDBC, jOOQ, MyBatis, Exposed, etc. + */ +class SpringConnectionProvider(private val dataSource: DataSource) : ConnectionProvider { + override fun getConnection(): Connection = DataSourceUtils.getConnection(dataSource) +} diff --git a/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxMysqlEndToEndTest.kt b/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxMysqlEndToEndTest.kt index 18153ba..e7ae83c 100644 --- a/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxMysqlEndToEndTest.kt +++ b/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxMysqlEndToEndTest.kt @@ -6,6 +6,8 @@ import com.github.tomakehurst.wiremock.client.WireMock.post import com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor import com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo import com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig +import com.mysql.cj.jdbc.MysqlDataSource +import com.softwaremill.okapi.core.ConnectionProvider import com.softwaremill.okapi.core.OutboxEntryProcessor import com.softwaremill.okapi.core.OutboxMessage import com.softwaremill.okapi.core.OutboxProcessor @@ -22,8 +24,6 @@ import liquibase.Liquibase import liquibase.database.DatabaseFactory import liquibase.database.jvm.JdbcConnection import liquibase.resource.ClassLoaderResourceAccessor -import org.jetbrains.exposed.v1.jdbc.Database -import org.jetbrains.exposed.v1.jdbc.transactions.transaction import org.testcontainers.containers.MySQLContainer import java.sql.DriverManager import java.time.Clock @@ -36,17 +36,18 @@ class OutboxMysqlEndToEndTest : lateinit var store: MysqlOutboxStore lateinit var publisher: OutboxPublisher lateinit var processor: OutboxProcessor + lateinit var jdbc: TestJdbcConnectionProvider beforeSpec { mysql.start() wiremock.start() - Database.connect( - url = mysql.jdbcUrl, - driver = mysql.driverClassName, - user = mysql.username, - password = mysql.password, - ) + val dataSource = MysqlDataSource().apply { + setURL(mysql.jdbcUrl) + user = mysql.username + setPassword(mysql.password) + } + jdbc = TestJdbcConnectionProvider(dataSource) val connection = DriverManager.getConnection(mysql.jdbcUrl, mysql.username, mysql.password) val db = DatabaseFactory.getInstance().findCorrectDatabaseImplementation(JdbcConnection(connection)) @@ -55,7 +56,7 @@ class OutboxMysqlEndToEndTest : connection.close() val clock = Clock.systemUTC() - store = MysqlOutboxStore(clock) + store = MysqlOutboxStore(jdbc, clock) publisher = OutboxPublisher(store, clock) val urlResolver = ServiceUrlResolver { "http://localhost:${wiremock.port()}" } @@ -71,7 +72,7 @@ class OutboxMysqlEndToEndTest : beforeEach { wiremock.resetAll() - transaction { exec("DELETE FROM outbox") } + jdbc.withTransaction { jdbc.getConnection().createStatement().use { it.execute("DELETE FROM outbox") } } } given("a message published within a transaction") { @@ -81,7 +82,7 @@ class OutboxMysqlEndToEndTest : .willReturn(aResponse().withStatus(200)), ) - transaction { + jdbc.withTransaction { publisher.publish( OutboxMessage("order.created", """{"orderId":"abc-123"}"""), httpDeliveryInfo { @@ -91,10 +92,10 @@ class OutboxMysqlEndToEndTest : ) } - transaction { processor.processNext() } + jdbc.withTransaction { processor.processNext() } val requests = wiremock.findAll(postRequestedFor(urlEqualTo("/api/notify"))) - val counts = transaction { store.countByStatuses() } + val counts = jdbc.withTransaction { store.countByStatuses() } then("WireMock receives exactly one POST request") { requests.size shouldBe 1 @@ -113,7 +114,7 @@ class OutboxMysqlEndToEndTest : .willReturn(aResponse().withStatus(500).withBody("Internal Server Error")), ) - transaction { + jdbc.withTransaction { publisher.publish( OutboxMessage("order.created", """{"orderId":"xyz-456"}"""), httpDeliveryInfo { @@ -123,9 +124,9 @@ class OutboxMysqlEndToEndTest : ) } - transaction { processor.processNext() } + jdbc.withTransaction { processor.processNext() } - val counts = transaction { store.countByStatuses() } + val counts = jdbc.withTransaction { store.countByStatuses() } then("entry stays PENDING (retriable failure, retries remaining)") { counts[OutboxStatus.PENDING] shouldBe 1L @@ -141,7 +142,7 @@ class OutboxMysqlEndToEndTest : .willReturn(aResponse().withStatus(400).withBody("Bad Request")), ) - transaction { + jdbc.withTransaction { publisher.publish( OutboxMessage("order.created", """{"orderId":"err-789"}"""), httpDeliveryInfo { @@ -151,9 +152,9 @@ class OutboxMysqlEndToEndTest : ) } - transaction { processor.processNext() } + jdbc.withTransaction { processor.processNext() } - val counts = transaction { store.countByStatuses() } + val counts = jdbc.withTransaction { store.countByStatuses() } then("entry is immediately FAILED (permanent failure)") { counts[OutboxStatus.FAILED] shouldBe 1L @@ -174,7 +175,7 @@ class OutboxMysqlEndToEndTest : ), ) - transaction { + jdbc.withTransaction { publisher.publish( OutboxMessage("order.created", """{"orderId":"net-000"}"""), httpDeliveryInfo { @@ -184,9 +185,9 @@ class OutboxMysqlEndToEndTest : ) } - transaction { processor.processNext() } + jdbc.withTransaction { processor.processNext() } - val counts = transaction { store.countByStatuses() } + val counts = jdbc.withTransaction { store.countByStatuses() } then("entry stays PENDING (retriable network failure)") { counts[OutboxStatus.PENDING] shouldBe 1L @@ -194,3 +195,27 @@ class OutboxMysqlEndToEndTest : } } }) + +private class TestJdbcConnectionProvider(private val dataSource: javax.sql.DataSource) : ConnectionProvider { + private val threadLocalConnection = ThreadLocal() + + override fun getConnection(): java.sql.Connection = threadLocalConnection.get() + ?: throw IllegalStateException("No connection bound to current thread. Use withTransaction { } in tests.") + + fun withTransaction(block: () -> T): T { + val conn = dataSource.connection + conn.autoCommit = false + threadLocalConnection.set(conn) + return try { + val result = block() + conn.commit() + result + } catch (e: Exception) { + conn.rollback() + throw e + } finally { + threadLocalConnection.remove() + conn.close() + } + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 1e1b7d3..93732e8 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -10,6 +10,7 @@ plugins { } include("okapi-core") +include("okapi-exposed") include("okapi-postgres") include("okapi-mysql") include("okapi-http")