Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class OrderService(
}
```

Autoconfiguration handles scheduling, retries, and delivery automatically.
Autoconfiguration handles scheduling, retries, and delivery automatically. For Micrometer metrics, also add `okapi-micrometer` — see [Observability](#observability).

**Using Kafka instead of HTTP?** Swap the deliverer bean and delivery info:

Expand Down Expand Up @@ -95,6 +95,29 @@ Okapi implements the [transactional outbox pattern](https://softwaremill.com/mic
- **Concurrent processing** — multiple processors can run in parallel using `FOR UPDATE SKIP LOCKED`, so messages are never processed twice simultaneously.
- **Delivery result classification** — each transport classifies errors as `Success`, `RetriableFailure`, or `PermanentFailure`. For example, HTTP 429 is retriable while HTTP 400 is permanent.

## Observability

Add `okapi-micrometer` alongside `okapi-spring-boot` (from the Quick Start above) to get Micrometer metrics:

```kotlin
implementation("com.softwaremill.okapi:okapi-micrometer")
```

With Spring Boot Actuator and a Prometheus registry (`micrometer-registry-prometheus`) on the classpath, metrics are automatically exposed on `/actuator/prometheus`. They are also visible via `/actuator/metrics`.

| Metric | Type | Description |
|--------|------|-------------|
| `okapi.entries.delivered` | Counter | Successfully delivered entries |
| `okapi.entries.retry.scheduled` | Counter | Failed attempts rescheduled for retry |
| `okapi.entries.failed` | Counter | Permanently failed entries |
| `okapi.batch.duration` | Timer | Processing time per batch |
| `okapi.entries.count` | Gauge | Current entry count (tag: `status=pending\|delivered\|failed`) |
| `okapi.entries.lag.seconds` | Gauge | Age of oldest entry in seconds (tag: `status`) |

**Without Spring Boot:** create `MicrometerOutboxListener` and `MicrometerOutboxMetrics` manually and pass a `MeterRegistry`. `MicrometerOutboxMetrics` requires a `TransactionRunner` for Exposed-backed stores — see the class KDoc for details.

**Custom listener:** implement `OutboxProcessorListener` to react to delivery events (logging, alerting, custom metrics). `OutboxProcessor` accepts a single listener; to combine multiple, implement a composite that delegates to each.

## Modules

```mermaid
Expand All @@ -103,9 +126,11 @@ graph BT
MY[okapi-mysql] --> CORE
HTTP[okapi-http] --> CORE
KAFKA[okapi-kafka] --> CORE
MICRO[okapi-micrometer] --> CORE
SPRING[okapi-spring-boot] --> CORE
SPRING -.->|compileOnly| PG
SPRING -.->|compileOnly| MY
SPRING -.->|compileOnly| MICRO
BOM[okapi-bom]

style CORE fill:#4a9eff,color:#fff
Expand All @@ -119,7 +144,8 @@ graph BT
| `okapi-mysql` | MySQL 8+ storage via Exposed ORM |
| `okapi-http` | HTTP webhook delivery (JDK HttpClient) |
| `okapi-kafka` | Kafka topic publishing |
| `okapi-spring-boot` | Spring Boot autoconfiguration (auto-detects store and transports) |
| `okapi-micrometer` | Micrometer metrics (counters, timers, gauges) |
| `okapi-spring-boot` | Spring Boot autoconfiguration (auto-detects store, transports, and metrics) |
| `okapi-bom` | Bill of Materials for version alignment |

## Compatibility
Expand Down
3 changes: 3 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ wiremock = "3.13.2"
slf4j = "2.0.17"
assertj = "3.27.7"
h2 = "2.4.240"
micrometer = "1.15.6"

[libraries]
kotlinGradlePlugin = { module = "org.jetbrains.kotlin:kotlin-gradle-plugin", version.ref = "kotlin" }
Expand Down Expand Up @@ -44,6 +45,8 @@ springBootAutoconfigure = { module = "org.springframework.boot:spring-boot-autoc
springBootStarterValidation = { module = "org.springframework.boot:spring-boot-starter-validation", version.ref = "springBoot" }
springBootTest = { module = "org.springframework.boot:spring-boot-test", version.ref = "springBoot" }
assertjCore = { module = "org.assertj:assertj-core", version.ref = "assertj" }
micrometerCore = { module = "io.micrometer:micrometer-core", version.ref = "micrometer" }
micrometerTest = { module = "io.micrometer:micrometer-test", version.ref = "micrometer" }
wiremock = { module = "org.wiremock:wiremock", version.ref = "wiremock" }
slf4jApi = { module = "org.slf4j:slf4j-api", version.ref = "slf4j" }
slf4jSimple = { module = "org.slf4j:slf4j-simple", version.ref = "slf4j" }
Expand Down
1 change: 1 addition & 0 deletions okapi-bom/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ dependencies {
api(project(":okapi-http"))
api(project(":okapi-kafka"))
api(project(":okapi-spring-boot"))
api(project(":okapi-micrometer"))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.softwaremill.okapi.core

import java.time.Duration

/**
* Outcome of processing a single [OutboxEntry], emitted by [OutboxProcessor]
* to [OutboxProcessorListener].
*/
sealed interface OutboxProcessingEvent {
val entry: OutboxEntry

/** Wall-clock duration of the delivery attempt, excluding the database update. */
val duration: Duration

data class Delivered(override val entry: OutboxEntry, override val duration: Duration) : OutboxProcessingEvent
data class RetryScheduled(override val entry: OutboxEntry, override val duration: Duration, val error: String) : OutboxProcessingEvent
data class Failed(override val entry: OutboxEntry, override val duration: Duration, val error: String) : OutboxProcessingEvent
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,61 @@
package com.softwaremill.okapi.core

import org.slf4j.LoggerFactory
import java.time.Clock
import java.time.Duration

/**
* Orchestrates a single processing cycle: claims pending entries from [OutboxStore],
* delegates each to [OutboxEntryProcessor], and persists the result.
*
* Transaction management is the caller's responsibility.
* An optional [OutboxProcessorListener] is notified after each entry and after the
* full batch. Exceptions in the listener are caught and logged — they never break
* processing. Transaction management is the caller's responsibility.
*/
class OutboxProcessor(
private val store: OutboxStore,
private val entryProcessor: OutboxEntryProcessor,
private val listener: OutboxProcessorListener? = null,
private val clock: Clock = Clock.systemUTC(),
) {
fun processNext(limit: Int = 10) {
val batchStart = clock.instant()
var count = 0
store.claimPending(limit).forEach { entry ->
val entryStart = clock.instant()
val updated = entryProcessor.process(entry)
val deliveryDuration = Duration.between(entryStart, clock.instant())
store.updateAfterProcessing(updated)
count++
notifyEntry(updated, deliveryDuration)
}
notifyBatch(count, Duration.between(batchStart, clock.instant()))
}

private fun notifyEntry(updated: OutboxEntry, duration: Duration) {
if (listener == null) return
try {
val event = when (updated.status) {
OutboxStatus.DELIVERED -> OutboxProcessingEvent.Delivered(updated, duration)
OutboxStatus.PENDING -> OutboxProcessingEvent.RetryScheduled(updated, duration, updated.lastError ?: "")
OutboxStatus.FAILED -> OutboxProcessingEvent.Failed(updated, duration, updated.lastError ?: "")
}
listener.onEntryProcessed(event)
} catch (e: Exception) {
logger.warn("OutboxProcessorListener.onEntryProcessed failed", e)
}
}

private fun notifyBatch(count: Int, duration: Duration) {
if (listener == null) return
try {
listener.onBatchProcessed(count, duration)
} catch (e: Exception) {
logger.warn("OutboxProcessorListener.onBatchProcessed failed", e)
}
}

companion object {
private val logger = LoggerFactory.getLogger(OutboxProcessor::class.java)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.softwaremill.okapi.core

import java.time.Duration

/**
* Callback interface for observing [OutboxProcessor] activity.
*
* Default no-op implementations allow consumers to override only the
* methods they care about. Exceptions thrown by implementations are
* caught and logged — they never break processing.
*
* [OutboxProcessor] accepts a single listener. To combine multiple listeners,
* implement a composite that delegates to each.
*/
interface OutboxProcessorListener {
/** Called after each entry is processed (delivered, retried, or failed). */
fun onEntryProcessed(event: OutboxProcessingEvent) {}

/** Called after a full batch completes (even if empty). */
fun onBatchProcessed(processedCount: Int, duration: Duration) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.softwaremill.okapi.core
import io.kotest.core.spec.style.BehaviorSpec
import io.kotest.matchers.shouldBe
import java.time.Clock
import java.time.Duration
import java.time.Instant
import java.time.ZoneOffset

Expand Down Expand Up @@ -123,4 +124,190 @@ class OutboxProcessorTest :
}
}
}

given("processNext() with a listener — delivery succeeds") {
val events = mutableListOf<OutboxProcessingEvent>()
var batchCount: Int? = null
var batchDuration: Duration? = null

val listener = object : OutboxProcessorListener {
override fun onEntryProcessed(event: OutboxProcessingEvent) {
events += event
}
override fun onBatchProcessed(processedCount: Int, duration: Duration) {
batchCount = processedCount
batchDuration = duration
}
}

`when`("delivery succeeds") {
pendingEntries = listOf(stubEntry())
val entryProcessor = OutboxEntryProcessor(
deliverer = stubDeliverer(DeliveryResult.Success),
retryPolicy = RetryPolicy(maxRetries = 3),
clock = fixedClock,
)
OutboxProcessor(store, entryProcessor, listener = listener, clock = fixedClock)
.processNext()
val capturedEvents = events.toList()
val capturedProcessed = processedEntries.toList()

then("listener receives Delivered event") {
capturedEvents.size shouldBe 1
capturedEvents.first() shouldBe OutboxProcessingEvent.Delivered(
entry = capturedProcessed.first(),
duration = Duration.ZERO,
)
}
then("batch callback is invoked with count=1") {
batchCount shouldBe 1
batchDuration shouldBe Duration.ZERO
}
}
}

given("processNext() with a listener — delivery returns RetriableFailure") {
val events = mutableListOf<OutboxProcessingEvent>()

val listener = object : OutboxProcessorListener {
override fun onEntryProcessed(event: OutboxProcessingEvent) {
events += event
}
}

`when`("retries remain") {
pendingEntries = listOf(stubEntry())
val entryProcessor = OutboxEntryProcessor(
deliverer = stubDeliverer(DeliveryResult.RetriableFailure("timeout")),
retryPolicy = RetryPolicy(maxRetries = 3),
clock = fixedClock,
)
OutboxProcessor(store, entryProcessor, listener = listener, clock = fixedClock)
.processNext()
val capturedEvents = events.toList()
val capturedProcessed = processedEntries.toList()

then("listener receives RetryScheduled event with error") {
capturedEvents.size shouldBe 1
capturedEvents.first() shouldBe OutboxProcessingEvent.RetryScheduled(
entry = capturedProcessed.first(),
duration = Duration.ZERO,
error = "timeout",
)
}
}
}

given("processNext() with a listener — delivery returns PermanentFailure") {
val events = mutableListOf<OutboxProcessingEvent>()

val listener = object : OutboxProcessorListener {
override fun onEntryProcessed(event: OutboxProcessingEvent) {
events += event
}
}

`when`("delivery returns PermanentFailure") {
pendingEntries = listOf(stubEntry())
val entryProcessor = OutboxEntryProcessor(
deliverer = stubDeliverer(DeliveryResult.PermanentFailure("bad request")),
retryPolicy = RetryPolicy(maxRetries = 3),
clock = fixedClock,
)
OutboxProcessor(store, entryProcessor, listener = listener, clock = fixedClock)
.processNext()
val capturedEvents = events.toList()
val capturedProcessed = processedEntries.toList()

then("listener receives Failed event with error") {
capturedEvents.size shouldBe 1
capturedEvents.first() shouldBe OutboxProcessingEvent.Failed(
entry = capturedProcessed.first(),
duration = Duration.ZERO,
error = "bad request",
)
}
}
}

given("processNext() when listener throws") {
val throwingListener = object : OutboxProcessorListener {
override fun onEntryProcessed(event: OutboxProcessingEvent) {
throw RuntimeException("listener exploded")
}
override fun onBatchProcessed(processedCount: Int, duration: Duration) {
throw RuntimeException("batch listener exploded")
}
}

`when`("entry notification throws") {
pendingEntries = listOf(stubEntry())
val entryProcessor = OutboxEntryProcessor(
deliverer = stubDeliverer(DeliveryResult.Success),
retryPolicy = RetryPolicy(maxRetries = 3),
clock = fixedClock,
)
OutboxProcessor(store, entryProcessor, listener = throwingListener, clock = fixedClock)
.processNext()
val capturedProcessed = processedEntries.toList()

then("entry is still processed and persisted") {
capturedProcessed.size shouldBe 1
capturedProcessed.first().status shouldBe OutboxStatus.DELIVERED
}
}
}

given("processNext() with no listener (null)") {
`when`("entries are processed") {
pendingEntries = listOf(stubEntry())
val entryProcessor = OutboxEntryProcessor(
deliverer = stubDeliverer(DeliveryResult.Success),
retryPolicy = RetryPolicy(maxRetries = 3),
clock = fixedClock,
)
OutboxProcessor(store, entryProcessor).processNext()
val capturedProcessed = processedEntries.toList()

then("processing works without NPE") {
capturedProcessed.size shouldBe 1
capturedProcessed.first().status shouldBe OutboxStatus.DELIVERED
}
}
}

given("processNext() with a listener — RetriableFailure exhausts retries") {
val events = mutableListOf<OutboxProcessingEvent>()

val listener = object : OutboxProcessorListener {
override fun onEntryProcessed(event: OutboxProcessingEvent) {
events += event
}
}

`when`("retries are exhausted (maxRetries=0)") {
pendingEntries = listOf(stubEntry())
val entryProcessor = OutboxEntryProcessor(
deliverer = stubDeliverer(DeliveryResult.RetriableFailure("still failing")),
retryPolicy = RetryPolicy(maxRetries = 0),
clock = fixedClock,
)
OutboxProcessor(store, entryProcessor, listener = listener, clock = fixedClock)
.processNext()
val capturedEvents = events.toList()
val capturedProcessed = processedEntries.toList()

then("entry is marked FAILED") {
capturedProcessed.first().status shouldBe OutboxStatus.FAILED
}
then("listener receives Failed event, not RetryScheduled") {
capturedEvents.size shouldBe 1
capturedEvents.first() shouldBe OutboxProcessingEvent.Failed(
entry = capturedProcessed.first(),
duration = Duration.ZERO,
error = "still failing",
)
}
}
}
})
Loading