Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ springOutboxPublisher.publish(

**Using MySQL instead of PostgreSQL?** Replace `okapi-postgres` with `okapi-mysql` in your dependencies — no code changes needed.

> **Note:** `okapi-postgres` and `okapi-mysql` require Exposed ORM dependencies in your project.
> Spring and Kafka versions are not forced by okapi — you control them.
> **Note:** Spring and Kafka versions are not forced by okapi — you control them.
> Okapi uses plain JDBC internally — it works with any `PlatformTransactionManager` (JPA, JDBC, jOOQ, Exposed, etc.).

## How It Works

Expand All @@ -103,6 +103,7 @@ graph BT
MY[okapi-mysql] --> CORE
HTTP[okapi-http] --> CORE
KAFKA[okapi-kafka] --> CORE
EXP[okapi-exposed] --> CORE
SPRING[okapi-spring-boot] --> CORE
SPRING -.->|compileOnly| PG
SPRING -.->|compileOnly| MY
Expand All @@ -114,9 +115,10 @@ graph BT

| Module | Purpose |
|--------|---------|
| `okapi-core` | Transport/storage-agnostic orchestration, scheduling, retry policy |
| `okapi-postgres` | PostgreSQL storage via Exposed ORM (`FOR UPDATE SKIP LOCKED`) |
| `okapi-mysql` | MySQL 8+ storage via Exposed ORM |
| `okapi-core` | Transport/storage-agnostic orchestration, scheduling, retry policy, `ConnectionProvider` interface |
| `okapi-exposed` | Exposed ORM integration — `ExposedConnectionProvider`, `ExposedTransactionRunner`, `ExposedTransactionContextValidator` |
| `okapi-postgres` | PostgreSQL storage via plain JDBC (`FOR UPDATE SKIP LOCKED`) |
| `okapi-mysql` | MySQL 8+ storage via plain JDBC |
| `okapi-http` | HTTP webhook delivery (JDK HttpClient) |
| `okapi-kafka` | Kafka topic publishing |
| `okapi-spring-boot` | Spring Boot autoconfiguration (auto-detects store and transports) |
Expand All @@ -129,7 +131,7 @@ graph BT
| Java | 21+ | Required |
| Spring Boot | 3.5.x, 4.0.x | `okapi-spring-boot` module |
| Kafka Clients | 3.9.x, 4.x | `okapi-kafka` — you provide `kafka-clients` |
| Exposed | 1.x | `okapi-postgres`, `okapi-mysql`you provide Exposed |
| Exposed | 1.x | `okapi-exposed` modulefor Ktor/standalone apps |

## Build

Expand Down
1 change: 1 addition & 0 deletions okapi-bom/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ description = "BOM for consistent versioning of Okapi modules"
dependencies {
constraints {
api(project(":okapi-core"))
api(project(":okapi-exposed"))
api(project(":okapi-postgres"))
api(project(":okapi-mysql"))
api(project(":okapi-http"))
Expand Down
3 changes: 0 additions & 3 deletions okapi-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@ description = "Core outbox abstractions and processing engine"

dependencies {
implementation(libs.slf4jApi)
compileOnly(libs.exposedJdbc)
testImplementation(libs.kotestRunnerJunit5)
testImplementation(libs.kotestAssertionsCore)
testImplementation(libs.exposedJdbc)
testImplementation(libs.h2)
testRuntimeOnly(libs.slf4jSimple)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.softwaremill.okapi.core

import java.sql.Connection

/**
* Provides a JDBC [Connection] from the current transactional context.
*
* Implementations bridge okapi's [OutboxStore] with the caller's transaction mechanism:
* - `okapi-spring-boot`: uses `DataSourceUtils.getConnection()` — works with JPA, JDBC, jOOQ, MyBatis, Exposed
* - `okapi-exposed`: uses Exposed's `TransactionManager.current().connection` — for Ktor/standalone Exposed
* - Standalone: user-provided lambda wrapping a `DataSource` or `ThreadLocal<Connection>`
*
* The returned connection is **borrowed** from the current transaction — the caller must NOT close it.
*/
fun interface ConnectionProvider {
fun getConnection(): Connection
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import java.util.concurrent.atomic.AtomicBoolean
*/
class OutboxPurger(
private val outboxStore: OutboxStore,
private val transactionRunner: TransactionRunner? = null,
private val config: OutboxPurgerConfig = OutboxPurgerConfig(),
private val clock: Clock = Clock.systemUTC(),
) {
Expand Down Expand Up @@ -58,7 +59,9 @@ class OutboxPurger(
var totalDeleted = 0
var batches = 0
do {
val deleted = outboxStore.removeDeliveredBefore(cutoff, config.batchSize)
val deleted = transactionRunner?.runInTransaction {
outboxStore.removeDeliveredBefore(cutoff, config.batchSize)
} ?: outboxStore.removeDeliveredBefore(cutoff, config.batchSize)
totalDeleted += deleted
batches++
} while (deleted == config.batchSize && batches < MAX_BATCHES_PER_TICK)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ package com.softwaremill.okapi.core
* Checks whether the current execution context is inside an active read-write transaction.
*
* Framework-specific modules provide implementations:
* - `okapi-spring`: [SpringTransactionContextValidator][com.softwaremill.okapi.springboot.SpringTransactionContextValidator]
* — checks via `TransactionSynchronizationManager`
* - `okapi-core`: [ExposedTransactionContextValidator] — checks via Exposed's `TransactionManager.currentOrNull()`
* - `okapi-spring-boot`: `SpringTransactionContextValidator` — checks via `TransactionSynchronizationManager`
* - `okapi-exposed`: `ExposedTransactionContextValidator` — checks via Exposed's `TransactionManager.currentOrNull()`
* - Standalone: no-op (always returns true)
*/
interface TransactionContextValidator {
Expand Down
15 changes: 15 additions & 0 deletions okapi-exposed/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
plugins {
id("buildsrc.convention.kotlin-jvm")
id("buildsrc.convention.publish")
}

description = "Exposed ORM integration — ConnectionProvider, TransactionRunner, TransactionContextValidator"

dependencies {
api(project(":okapi-core"))
implementation(libs.exposedJdbc)

testImplementation(libs.kotestRunnerJunit5)
testImplementation(libs.kotestAssertionsCore)
testImplementation(libs.h2)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.softwaremill.okapi.exposed

import com.softwaremill.okapi.core.ConnectionProvider
import org.jetbrains.exposed.v1.jdbc.transactions.TransactionManager
import java.sql.Connection

/**
* Exposed implementation of [ConnectionProvider].
*
* Retrieves the JDBC [Connection] from the current Exposed transaction.
* Use this when your application manages transactions via Exposed's
* `transaction(database) { }` blocks (e.g., Ktor + Exposed apps).
*
* The returned connection is **borrowed** from Exposed's active transaction —
* the caller must NOT close it.
*/
class ExposedConnectionProvider : ConnectionProvider {
override fun getConnection(): Connection = TransactionManager.current().connection.connection as Connection
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.softwaremill.okapi.core
package com.softwaremill.okapi.exposed

import com.softwaremill.okapi.core.TransactionContextValidator
import org.jetbrains.exposed.v1.jdbc.Database
import org.jetbrains.exposed.v1.jdbc.transactions.currentOrNull
import org.jetbrains.exposed.v1.jdbc.transactions.transactionManager
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.softwaremill.okapi.exposed

import com.softwaremill.okapi.core.TransactionRunner
import org.jetbrains.exposed.v1.jdbc.Database
import org.jetbrains.exposed.v1.jdbc.transactions.transaction

/**
* Exposed implementation of [TransactionRunner].
*
* Wraps the block in Exposed's `transaction(database) { }`.
* Used by the outbox scheduler/processor when running outside of
* an existing transactional context (e.g., background processing thread).
*
* @param database The [Database] instance where the outbox table resides.
*/
class ExposedTransactionRunner(
private val database: Database,
) : TransactionRunner {
override fun <T> runInTransaction(block: () -> T): T = transaction(database) { block() }
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.softwaremill.okapi.core
package com.softwaremill.okapi.exposed

import io.kotest.core.spec.style.BehaviorSpec
import io.kotest.matchers.shouldBe
Expand Down
7 changes: 0 additions & 7 deletions okapi-integration-tests/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,6 @@ dependencies {
testImplementation(libs.postgresql)
testImplementation(libs.mysql)

// Exposed ORM (for transaction blocks and DB queries in tests)
testImplementation(libs.exposedCore)
testImplementation(libs.exposedJdbc)
testImplementation(libs.exposedJson)
testImplementation(libs.exposedJavaTime)
testImplementation(libs.exposedSpringTransaction)

// Liquibase (schema migrations in tests)
testImplementation(libs.liquibaseCore)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import com.softwaremill.okapi.core.OutboxProcessor
import com.softwaremill.okapi.core.OutboxStatus
import com.softwaremill.okapi.core.OutboxStore
import com.softwaremill.okapi.core.RetryPolicy
import com.softwaremill.okapi.test.support.JdbcConnectionProvider
import com.softwaremill.okapi.test.support.RecordingMessageDeliverer
import io.kotest.assertions.withClue
import io.kotest.core.spec.style.FunSpec
import io.kotest.matchers.collections.shouldHaveSize
import io.kotest.matchers.maps.shouldContain
import io.kotest.matchers.shouldBe
import org.jetbrains.exposed.v1.jdbc.transactions.transaction
import java.sql.Connection
import java.time.Clock
import java.time.Instant
Expand All @@ -41,15 +41,18 @@ private fun createTestEntry(index: Int, now: Instant = Instant.parse("2024-01-01

fun FunSpec.concurrentClaimTests(
dbName: String,
jdbcProvider: () -> JdbcConnectionProvider,
storeFactory: () -> OutboxStore,
startDb: () -> Unit,
stopDb: () -> Unit,
truncate: () -> Unit,
) {
lateinit var store: OutboxStore
lateinit var jdbc: JdbcConnectionProvider

beforeSpec {
startDb()
jdbc = jdbcProvider()
store = storeFactory()
}

Expand All @@ -63,7 +66,7 @@ fun FunSpec.concurrentClaimTests(

test("[$dbName] concurrent claimPending with held locks produces disjoint sets") {
// Insert 20 entries
val allIds = transaction {
val allIds = jdbc.withTransaction {
(0 until 20).map { i ->
val entry = createTestEntry(i)
store.persist(entry)
Expand All @@ -80,7 +83,7 @@ fun FunSpec.concurrentClaimTests(
// next-key locks cause SKIP LOCKED to skip more rows than actually locked.
val threadA = Thread.ofVirtual().name("processor-A").start {
try {
transaction(transactionIsolation = Connection.TRANSACTION_READ_COMMITTED) {
jdbc.withTransaction(transactionIsolation = Connection.TRANSACTION_READ_COMMITTED) {
val claimed = store.claimPending(10)
claimedByA.complete(claimed.map { it.outboxId })
lockAcquired.countDown()
Expand All @@ -97,7 +100,7 @@ fun FunSpec.concurrentClaimTests(

// Main thread: claim remaining entries (SKIP LOCKED should skip A's locked rows)
val idsA = claimedByA.get(10, TimeUnit.SECONDS)
val idsB = transaction(transactionIsolation = Connection.TRANSACTION_READ_COMMITTED) {
val idsB = jdbc.withTransaction(transactionIsolation = Connection.TRANSACTION_READ_COMMITTED) {
store.claimPending(10)
}.map { it.outboxId }

Expand All @@ -123,7 +126,7 @@ fun FunSpec.concurrentClaimTests(
val fixedClock = Clock.fixed(Instant.parse("2024-01-01T00:00:00Z"), ZoneOffset.UTC)

// Insert 50 entries
transaction {
jdbc.withTransaction {
(0 until 50).forEach { i -> store.persist(createTestEntry(i)) }
}

Expand All @@ -137,7 +140,7 @@ fun FunSpec.concurrentClaimTests(
CompletableFuture.supplyAsync(
{
barrier.await(10, TimeUnit.SECONDS)
transaction(transactionIsolation = Connection.TRANSACTION_READ_COMMITTED) {
jdbc.withTransaction(transactionIsolation = Connection.TRANSACTION_READ_COMMITTED) {
OutboxProcessor(store, entryProcessor).processNext(limit = 50)
}
},
Expand All @@ -156,7 +159,7 @@ fun FunSpec.concurrentClaimTests(
}

// Verify DB state
val counts = transaction { store.countByStatuses() }
val counts = jdbc.withTransaction { store.countByStatuses() }
withClue("DB state after concurrent processing: $counts") {
counts shouldContain (OutboxStatus.DELIVERED to 50L)
counts shouldContain (OutboxStatus.PENDING to 0L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ class MysqlConcurrentClaimTest : FunSpec({

concurrentClaimTests(
dbName = "mysql",
storeFactory = { MysqlOutboxStore(Clock.fixed(Instant.parse("2024-01-01T00:00:00Z"), ZoneOffset.UTC)) },
jdbcProvider = { db.jdbc },
storeFactory = { MysqlOutboxStore(db.jdbc, Clock.fixed(Instant.parse("2024-01-01T00:00:00Z"), ZoneOffset.UTC)) },
startDb = { db.start() },
stopDb = { db.stop() },
truncate = { db.truncate() },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ class PostgresConcurrentClaimTest : FunSpec({

concurrentClaimTests(
dbName = "postgres",
storeFactory = { PostgresOutboxStore(Clock.fixed(Instant.parse("2024-01-01T00:00:00Z"), ZoneOffset.UTC)) },
jdbcProvider = { db.jdbc },
storeFactory = { PostgresOutboxStore(db.jdbc, Clock.fixed(Instant.parse("2024-01-01T00:00:00Z"), ZoneOffset.UTC)) },
startDb = { db.start() },
stopDb = { db.stop() },
truncate = { db.truncate() },
Expand Down
Loading