From 33a4b832e63e6a3778d3b518aaf0cc723d2d59df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Wed, 8 Apr 2026 11:49:01 +0200 Subject: [PATCH 01/11] build: scaffold okapi-micrometer module (KOJAK-44) Add Micrometer to version catalog, register okapi-micrometer module in settings and BOM. Module depends on okapi-core with micrometer-core as compileOnly. --- gradle/libs.versions.toml | 3 +++ okapi-bom/build.gradle.kts | 1 + okapi-micrometer/build.gradle.kts | 13 +++++++++++++ settings.gradle.kts | 1 + 4 files changed, 18 insertions(+) create mode 100644 okapi-micrometer/build.gradle.kts diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 61d40fe..dd3425e 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -16,6 +16,7 @@ wiremock = "3.13.2" slf4j = "2.0.17" assertj = "3.27.7" h2 = "2.4.240" +micrometer = "1.15.6" [libraries] kotlinGradlePlugin = { module = "org.jetbrains.kotlin:kotlin-gradle-plugin", version.ref = "kotlin" } @@ -44,6 +45,8 @@ springBootAutoconfigure = { module = "org.springframework.boot:spring-boot-autoc springBootStarterValidation = { module = "org.springframework.boot:spring-boot-starter-validation", version.ref = "springBoot" } springBootTest = { module = "org.springframework.boot:spring-boot-test", version.ref = "springBoot" } assertjCore = { module = "org.assertj:assertj-core", version.ref = "assertj" } +micrometerCore = { module = "io.micrometer:micrometer-core", version.ref = "micrometer" } +micrometerTest = { module = "io.micrometer:micrometer-test", version.ref = "micrometer" } wiremock = { module = "org.wiremock:wiremock", version.ref = "wiremock" } slf4jApi = { module = "org.slf4j:slf4j-api", version.ref = "slf4j" } slf4jSimple = { module = "org.slf4j:slf4j-simple", version.ref = "slf4j" } diff --git a/okapi-bom/build.gradle.kts b/okapi-bom/build.gradle.kts index 588ba56..03c6076 100644 --- a/okapi-bom/build.gradle.kts +++ b/okapi-bom/build.gradle.kts @@ -13,5 +13,6 @@ dependencies { api(project(":okapi-http")) api(project(":okapi-kafka")) api(project(":okapi-spring-boot")) + api(project(":okapi-micrometer")) } } diff --git a/okapi-micrometer/build.gradle.kts b/okapi-micrometer/build.gradle.kts new file mode 100644 index 0000000..939eda8 --- /dev/null +++ b/okapi-micrometer/build.gradle.kts @@ -0,0 +1,13 @@ +plugins { + id("buildsrc.convention.kotlin-jvm") +} + +dependencies { + implementation(project(":okapi-core")) + compileOnly(libs.micrometerCore) + + testImplementation(libs.kotestRunnerJunit5) + testImplementation(libs.kotestAssertionsCore) + testImplementation(libs.micrometerCore) + testImplementation(libs.micrometerTest) +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 1e1b7d3..832da0b 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -17,5 +17,6 @@ include("okapi-kafka") include("okapi-spring-boot") include("okapi-bom") include("okapi-integration-tests") +include("okapi-micrometer") rootProject.name = "okapi" From c5963b2a00916f515a0f3accd565b780d3d5826c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Wed, 8 Apr 2026 11:55:34 +0200 Subject: [PATCH 02/11] feat(core): wire OutboxProcessorListener into OutboxProcessor (KOJAK-44) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit OutboxProcessor accepts an optional listener and clock. After each entry is processed, it emits a sealed OutboxProcessingEvent (Delivered, Retried, Failed) with per-entry Duration. After the batch, it calls onBatchProcessed. Exceptions in the listener are caught and logged — they never break processing. --- .../okapi/core/OutboxProcessor.kt | 43 ++++- .../okapi/core/OutboxProcessorTest.kt | 152 ++++++++++++++++++ 2 files changed, 194 insertions(+), 1 deletion(-) diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxProcessor.kt b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxProcessor.kt index b29d6cf..a9e4fb0 100644 --- a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxProcessor.kt +++ b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxProcessor.kt @@ -1,19 +1,60 @@ package com.softwaremill.okapi.core +import org.slf4j.LoggerFactory +import java.time.Clock +import java.time.Duration + /** * Orchestrates a single processing cycle: claims pending entries from [OutboxStore], * delegates each to [OutboxEntryProcessor], and persists the result. * - * Transaction management is the caller's responsibility. + * An optional [OutboxProcessorListener] is notified after each entry and after the + * full batch. Exceptions in the listener are caught and logged — they never break + * processing. Transaction management is the caller's responsibility. */ class OutboxProcessor( private val store: OutboxStore, private val entryProcessor: OutboxEntryProcessor, + private val listener: OutboxProcessorListener? = null, + private val clock: Clock = Clock.systemUTC(), ) { fun processNext(limit: Int = 10) { + val batchStart = clock.instant() + var count = 0 store.claimPending(limit).forEach { entry -> + val entryStart = clock.instant() val updated = entryProcessor.process(entry) store.updateAfterProcessing(updated) + count++ + notifyEntry(updated, Duration.between(entryStart, clock.instant())) } + notifyBatch(count, Duration.between(batchStart, clock.instant())) + } + + private fun notifyEntry(updated: OutboxEntry, duration: Duration) { + if (listener == null) return + try { + val event = when (updated.status) { + OutboxStatus.DELIVERED -> OutboxProcessingEvent.Delivered(updated, duration) + OutboxStatus.PENDING -> OutboxProcessingEvent.Retried(updated, duration, updated.lastError ?: "") + OutboxStatus.FAILED -> OutboxProcessingEvent.Failed(updated, duration, updated.lastError ?: "") + } + listener.onEntryProcessed(event) + } catch (e: Exception) { + logger.warn("OutboxProcessorListener.onEntryProcessed failed", e) + } + } + + private fun notifyBatch(count: Int, duration: Duration) { + if (listener == null) return + try { + listener.onBatchProcessed(count, duration) + } catch (e: Exception) { + logger.warn("OutboxProcessorListener.onBatchProcessed failed", e) + } + } + + companion object { + private val logger = LoggerFactory.getLogger(OutboxProcessor::class.java) } } diff --git a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxProcessorTest.kt b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxProcessorTest.kt index 9d5bb1b..9dbdd11 100644 --- a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxProcessorTest.kt +++ b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxProcessorTest.kt @@ -3,6 +3,7 @@ package com.softwaremill.okapi.core import io.kotest.core.spec.style.BehaviorSpec import io.kotest.matchers.shouldBe import java.time.Clock +import java.time.Duration import java.time.Instant import java.time.ZoneOffset @@ -123,4 +124,155 @@ class OutboxProcessorTest : } } } + + given("processNext() with a listener — delivery succeeds") { + val events = mutableListOf() + var batchCount: Int? = null + var batchDuration: Duration? = null + + val listener = object : OutboxProcessorListener { + override fun onEntryProcessed(event: OutboxProcessingEvent) { + events += event + } + override fun onBatchProcessed(processedCount: Int, duration: Duration) { + batchCount = processedCount + batchDuration = duration + } + } + + `when`("delivery succeeds") { + pendingEntries = listOf(stubEntry()) + val entryProcessor = OutboxEntryProcessor( + deliverer = stubDeliverer(DeliveryResult.Success), + retryPolicy = RetryPolicy(maxRetries = 3), + clock = fixedClock, + ) + OutboxProcessor(store, entryProcessor, listener = listener, clock = fixedClock) + .processNext() + val capturedEvents = events.toList() + val capturedProcessed = processedEntries.toList() + + then("listener receives Delivered event") { + capturedEvents.size shouldBe 1 + capturedEvents.first() shouldBe OutboxProcessingEvent.Delivered( + entry = capturedProcessed.first(), + duration = Duration.ZERO, + ) + } + then("batch callback is invoked with count=1") { + batchCount shouldBe 1 + batchDuration shouldBe Duration.ZERO + } + } + } + + given("processNext() with a listener — delivery returns RetriableFailure") { + val events = mutableListOf() + + val listener = object : OutboxProcessorListener { + override fun onEntryProcessed(event: OutboxProcessingEvent) { + events += event + } + } + + `when`("retries remain") { + pendingEntries = listOf(stubEntry()) + val entryProcessor = OutboxEntryProcessor( + deliverer = stubDeliverer(DeliveryResult.RetriableFailure("timeout")), + retryPolicy = RetryPolicy(maxRetries = 3), + clock = fixedClock, + ) + OutboxProcessor(store, entryProcessor, listener = listener, clock = fixedClock) + .processNext() + val capturedEvents = events.toList() + val capturedProcessed = processedEntries.toList() + + then("listener receives Retried event with error") { + capturedEvents.size shouldBe 1 + capturedEvents.first() shouldBe OutboxProcessingEvent.Retried( + entry = capturedProcessed.first(), + duration = Duration.ZERO, + error = "timeout", + ) + } + } + } + + given("processNext() with a listener — delivery returns PermanentFailure") { + val events = mutableListOf() + + val listener = object : OutboxProcessorListener { + override fun onEntryProcessed(event: OutboxProcessingEvent) { + events += event + } + } + + `when`("delivery returns PermanentFailure") { + pendingEntries = listOf(stubEntry()) + val entryProcessor = OutboxEntryProcessor( + deliverer = stubDeliverer(DeliveryResult.PermanentFailure("bad request")), + retryPolicy = RetryPolicy(maxRetries = 3), + clock = fixedClock, + ) + OutboxProcessor(store, entryProcessor, listener = listener, clock = fixedClock) + .processNext() + val capturedEvents = events.toList() + val capturedProcessed = processedEntries.toList() + + then("listener receives Failed event with error") { + capturedEvents.size shouldBe 1 + capturedEvents.first() shouldBe OutboxProcessingEvent.Failed( + entry = capturedProcessed.first(), + duration = Duration.ZERO, + error = "bad request", + ) + } + } + } + + given("processNext() when listener throws") { + val throwingListener = object : OutboxProcessorListener { + override fun onEntryProcessed(event: OutboxProcessingEvent) { + throw RuntimeException("listener exploded") + } + override fun onBatchProcessed(processedCount: Int, duration: Duration) { + throw RuntimeException("batch listener exploded") + } + } + + `when`("entry notification throws") { + pendingEntries = listOf(stubEntry()) + val entryProcessor = OutboxEntryProcessor( + deliverer = stubDeliverer(DeliveryResult.Success), + retryPolicy = RetryPolicy(maxRetries = 3), + clock = fixedClock, + ) + OutboxProcessor(store, entryProcessor, listener = throwingListener, clock = fixedClock) + .processNext() + val capturedProcessed = processedEntries.toList() + + then("entry is still processed and persisted") { + capturedProcessed.size shouldBe 1 + capturedProcessed.first().status shouldBe OutboxStatus.DELIVERED + } + } + } + + given("processNext() with no listener (null)") { + `when`("entries are processed") { + pendingEntries = listOf(stubEntry()) + val entryProcessor = OutboxEntryProcessor( + deliverer = stubDeliverer(DeliveryResult.Success), + retryPolicy = RetryPolicy(maxRetries = 3), + clock = fixedClock, + ) + OutboxProcessor(store, entryProcessor).processNext() + val capturedProcessed = processedEntries.toList() + + then("processing works without NPE") { + capturedProcessed.size shouldBe 1 + capturedProcessed.first().status shouldBe OutboxStatus.DELIVERED + } + } + } }) From a5c45687c5720d819884d4371406934deb5fb219 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Wed, 8 Apr 2026 12:01:08 +0200 Subject: [PATCH 03/11] feat(micrometer): add MicrometerOutboxListener (KOJAK-44) Implements OutboxProcessorListener with Micrometer counters for delivered/retried/failed entries and a timer for batch duration. --- .../micrometer/MicrometerOutboxListener.kt | 38 +++++++++++ .../MicrometerOutboxListenerTest.kt | 66 +++++++++++++++++++ 2 files changed, 104 insertions(+) create mode 100644 okapi-micrometer/src/main/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxListener.kt create mode 100644 okapi-micrometer/src/test/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxListenerTest.kt diff --git a/okapi-micrometer/src/main/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxListener.kt b/okapi-micrometer/src/main/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxListener.kt new file mode 100644 index 0000000..6a3b6d9 --- /dev/null +++ b/okapi-micrometer/src/main/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxListener.kt @@ -0,0 +1,38 @@ +package com.softwaremill.okapi.micrometer + +import com.softwaremill.okapi.core.OutboxProcessingEvent +import com.softwaremill.okapi.core.OutboxProcessingEvent.Delivered +import com.softwaremill.okapi.core.OutboxProcessingEvent.Failed +import com.softwaremill.okapi.core.OutboxProcessingEvent.Retried +import com.softwaremill.okapi.core.OutboxProcessorListener +import io.micrometer.core.instrument.Counter +import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.core.instrument.Timer +import java.time.Duration + +/** + * [OutboxProcessorListener] that records delivery outcomes as Micrometer counters + * and batch duration as a timer. + * + * Registered metrics: + * - `okapi.entries.delivered` — counter + * - `okapi.entries.retried` — counter + * - `okapi.entries.failed` — counter + * - `okapi.batch.duration` — timer + */ +class MicrometerOutboxListener(registry: MeterRegistry) : OutboxProcessorListener { + private val deliveredCounter = Counter.builder("okapi.entries.delivered").register(registry) + private val retriedCounter = Counter.builder("okapi.entries.retried").register(registry) + private val failedCounter = Counter.builder("okapi.entries.failed").register(registry) + private val batchTimer = Timer.builder("okapi.batch.duration").register(registry) + + override fun onEntryProcessed(event: OutboxProcessingEvent) = when (event) { + is Delivered -> deliveredCounter.increment() + is Retried -> retriedCounter.increment() + is Failed -> failedCounter.increment() + } + + override fun onBatchProcessed(processedCount: Int, duration: Duration) { + batchTimer.record(duration) + } +} diff --git a/okapi-micrometer/src/test/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxListenerTest.kt b/okapi-micrometer/src/test/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxListenerTest.kt new file mode 100644 index 0000000..0aaa7ce --- /dev/null +++ b/okapi-micrometer/src/test/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxListenerTest.kt @@ -0,0 +1,66 @@ +package com.softwaremill.okapi.micrometer + +import com.softwaremill.okapi.core.DeliveryInfo +import com.softwaremill.okapi.core.OutboxEntry +import com.softwaremill.okapi.core.OutboxMessage +import com.softwaremill.okapi.core.OutboxProcessingEvent +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.shouldBe +import io.micrometer.core.instrument.simple.SimpleMeterRegistry +import java.time.Duration +import java.time.Instant + +private val stubDeliveryInfo = + object : DeliveryInfo { + override val type = "stub" + + override fun serialize(): String = """{"type":"stub"}""" + } + +private fun stubEntry(): OutboxEntry = OutboxEntry.createPending(OutboxMessage("test.event", "{}"), stubDeliveryInfo, Instant.EPOCH) + +class MicrometerOutboxListenerTest : FunSpec({ + + test("Delivered event increments delivered counter") { + val registry = SimpleMeterRegistry() + val listener = MicrometerOutboxListener(registry) + val entry = stubEntry().toDelivered(Instant.EPOCH) + + listener.onEntryProcessed(OutboxProcessingEvent.Delivered(entry, Duration.ofMillis(42))) + + registry.counter("okapi.entries.delivered").count() shouldBe 1.0 + registry.counter("okapi.entries.retried").count() shouldBe 0.0 + registry.counter("okapi.entries.failed").count() shouldBe 0.0 + } + + test("Retried event increments retried counter") { + val registry = SimpleMeterRegistry() + val listener = MicrometerOutboxListener(registry) + val entry = stubEntry().retry(Instant.EPOCH, "timeout") + + listener.onEntryProcessed(OutboxProcessingEvent.Retried(entry, Duration.ofMillis(10), "timeout")) + + registry.counter("okapi.entries.retried").count() shouldBe 1.0 + } + + test("Failed event increments failed counter") { + val registry = SimpleMeterRegistry() + val listener = MicrometerOutboxListener(registry) + val entry = stubEntry().toFailed(Instant.EPOCH, "bad request") + + listener.onEntryProcessed(OutboxProcessingEvent.Failed(entry, Duration.ofMillis(5), "bad request")) + + registry.counter("okapi.entries.failed").count() shouldBe 1.0 + } + + test("onBatchProcessed records duration in timer") { + val registry = SimpleMeterRegistry() + val listener = MicrometerOutboxListener(registry) + + listener.onBatchProcessed(3, Duration.ofMillis(150)) + + val timer = registry.timer("okapi.batch.duration") + timer.count() shouldBe 1 + timer.totalTime(java.util.concurrent.TimeUnit.MILLISECONDS) shouldBe 150.0 + } +}) From 655288e8aea2000534631881ef1aaf740d374c9e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Wed, 8 Apr 2026 12:03:06 +0200 Subject: [PATCH 04/11] feat(micrometer): add MicrometerOutboxMetrics gauges (KOJAK-44) Registers count-per-status and lag-per-status gauges that poll OutboxStore on each Prometheus scrape. Gauge suppliers are wrapped in an optional TransactionRunner (required for Exposed-backed stores) with try-catch returning NaN on failure. --- okapi-micrometer/build.gradle.kts | 2 + .../micrometer/MicrometerOutboxMetrics.kt | 58 +++++++++ .../micrometer/MicrometerOutboxMetricsTest.kt | 113 ++++++++++++++++++ 3 files changed, 173 insertions(+) create mode 100644 okapi-micrometer/src/main/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxMetrics.kt create mode 100644 okapi-micrometer/src/test/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxMetricsTest.kt diff --git a/okapi-micrometer/build.gradle.kts b/okapi-micrometer/build.gradle.kts index 939eda8..98b7b3d 100644 --- a/okapi-micrometer/build.gradle.kts +++ b/okapi-micrometer/build.gradle.kts @@ -4,10 +4,12 @@ plugins { dependencies { implementation(project(":okapi-core")) + implementation(libs.slf4jApi) compileOnly(libs.micrometerCore) testImplementation(libs.kotestRunnerJunit5) testImplementation(libs.kotestAssertionsCore) testImplementation(libs.micrometerCore) testImplementation(libs.micrometerTest) + testRuntimeOnly(libs.slf4jSimple) } diff --git a/okapi-micrometer/src/main/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxMetrics.kt b/okapi-micrometer/src/main/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxMetrics.kt new file mode 100644 index 0000000..b23edca --- /dev/null +++ b/okapi-micrometer/src/main/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxMetrics.kt @@ -0,0 +1,58 @@ +package com.softwaremill.okapi.micrometer + +import com.softwaremill.okapi.core.OutboxStatus +import com.softwaremill.okapi.core.OutboxStore +import com.softwaremill.okapi.core.TransactionRunner +import io.micrometer.core.instrument.Gauge +import io.micrometer.core.instrument.MeterRegistry +import org.slf4j.LoggerFactory +import java.time.Clock +import java.time.Duration + +/** + * Registers Micrometer gauges that poll [OutboxStore] on every Prometheus scrape. + * + * Registered metrics: + * - `okapi.entries.count` (tag: status) — number of entries per status + * - `okapi.entries.lag.seconds` (tag: status) — age of the oldest entry per status + * + * Gauge suppliers run on the Prometheus scrape thread, which has no ambient transaction. + * Store implementations backed by Exposed (e.g. [com.softwaremill.okapi.postgres.PostgresOutboxStore]) + * require an active transaction on the calling thread, so a [TransactionRunner] must be supplied + * when using such stores. A read-only [TransactionRunner] is recommended. + * + * If a store call throws, the gauge returns [Double.NaN] and the exception is logged at WARN. + * This surfaces database outages as visible metric gaps instead of silently reporting zero. + */ +class MicrometerOutboxMetrics( + private val store: OutboxStore, + registry: MeterRegistry, + private val transactionRunner: TransactionRunner? = null, + private val clock: Clock = Clock.systemUTC(), +) { + init { + for (status in OutboxStatus.entries) { + Gauge.builder("okapi.entries.count") { safeQuery { store.countByStatuses()[status]?.toDouble() ?: 0.0 } } + .tag("status", status.name.lowercase()) + .register(registry) + + Gauge.builder("okapi.entries.lag.seconds") { + safeQuery { + store.findOldestCreatedAt(setOf(status))[status] + ?.let { Duration.between(it, clock.instant()).toMillis() / 1000.0 } ?: 0.0 + } + }.tag("status", status.name.lowercase()).register(registry) + } + } + + private fun safeQuery(query: () -> Double): Double = try { + transactionRunner?.runInTransaction(query) ?: query() + } catch (e: Exception) { + logger.warn("Failed to read outbox metric from store", e) + Double.NaN + } + + companion object { + private val logger = LoggerFactory.getLogger(MicrometerOutboxMetrics::class.java) + } +} diff --git a/okapi-micrometer/src/test/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxMetricsTest.kt b/okapi-micrometer/src/test/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxMetricsTest.kt new file mode 100644 index 0000000..901d0ab --- /dev/null +++ b/okapi-micrometer/src/test/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxMetricsTest.kt @@ -0,0 +1,113 @@ +package com.softwaremill.okapi.micrometer + +import com.softwaremill.okapi.core.OutboxEntry +import com.softwaremill.okapi.core.OutboxStatus +import com.softwaremill.okapi.core.OutboxStore +import com.softwaremill.okapi.core.TransactionRunner +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.shouldBe +import io.micrometer.core.instrument.simple.SimpleMeterRegistry +import java.time.Clock +import java.time.Instant +import java.time.ZoneOffset + +private fun stubStore(counts: Map = emptyMap(), oldest: Map = emptyMap()) = + object : OutboxStore { + override fun persist(entry: OutboxEntry) = entry + override fun claimPending(limit: Int) = emptyList() + override fun updateAfterProcessing(entry: OutboxEntry) = entry + override fun removeDeliveredBefore(time: Instant, limit: Int) = 0 + + override fun findOldestCreatedAt(statuses: Set) = oldest.filterKeys { it in statuses } + + override fun countByStatuses() = counts + } + +private fun throwingStore(error: Exception = RuntimeException("db down")) = object : OutboxStore { + override fun persist(entry: OutboxEntry) = entry + override fun claimPending(limit: Int) = emptyList() + override fun updateAfterProcessing(entry: OutboxEntry) = entry + override fun removeDeliveredBefore(time: Instant, limit: Int) = 0 + override fun findOldestCreatedAt(statuses: Set): Map = throw error + override fun countByStatuses(): Map = throw error +} + +class MicrometerOutboxMetricsTest : FunSpec({ + + test("count gauge reflects store values per status") { + val store = stubStore( + counts = mapOf(OutboxStatus.PENDING to 5L, OutboxStatus.FAILED to 2L), + ) + val registry = SimpleMeterRegistry() + MicrometerOutboxMetrics(store, registry) + + val pendingGauge = registry.find("okapi.entries.count").tag("status", "pending").gauge() + val failedGauge = registry.find("okapi.entries.count").tag("status", "failed").gauge() + val deliveredGauge = registry.find("okapi.entries.count").tag("status", "delivered").gauge() + + pendingGauge!!.value() shouldBe 5.0 + failedGauge!!.value() shouldBe 2.0 + deliveredGauge!!.value() shouldBe 0.0 + } + + test("lag gauge computes seconds between oldest entry and now") { + val now = Instant.parse("2024-01-01T00:01:00Z") + val oldest = Instant.parse("2024-01-01T00:00:00Z") // 60 seconds ago + val fixedClock = Clock.fixed(now, ZoneOffset.UTC) + val store = stubStore(oldest = mapOf(OutboxStatus.PENDING to oldest)) + val registry = SimpleMeterRegistry() + MicrometerOutboxMetrics(store, registry, clock = fixedClock) + + val lagGauge = registry.find("okapi.entries.lag.seconds").tag("status", "pending").gauge() + lagGauge!!.value() shouldBe 60.0 + } + + test("lag gauge returns 0 when no entries for status") { + val store = stubStore() + val registry = SimpleMeterRegistry() + MicrometerOutboxMetrics(store, registry) + + val lagGauge = registry.find("okapi.entries.lag.seconds").tag("status", "pending").gauge() + lagGauge!!.value() shouldBe 0.0 + } + + test("transactionRunner wraps gauge queries when provided") { + val store = stubStore(counts = mapOf(OutboxStatus.PENDING to 7L)) + val registry = SimpleMeterRegistry() + var wrapCount = 0 + val recordingRunner = object : TransactionRunner { + override fun runInTransaction(block: () -> T): T { + wrapCount++ + return block() + } + } + MicrometerOutboxMetrics(store, registry, transactionRunner = recordingRunner) + + val pendingGauge = registry.find("okapi.entries.count").tag("status", "pending").gauge() + pendingGauge!!.value() shouldBe 7.0 + wrapCount shouldBe 1 + } + + test("gauge returns NaN and logs when store throws") { + val registry = SimpleMeterRegistry() + MicrometerOutboxMetrics(throwingStore(), registry) + + val countGauge = registry.find("okapi.entries.count").tag("status", "pending").gauge() + val lagGauge = registry.find("okapi.entries.lag.seconds").tag("status", "pending").gauge() + + countGauge!!.value().isNaN() shouldBe true + lagGauge!!.value().isNaN() shouldBe true + } + + test("gauge returns NaN when transactionRunner throws") { + val store = stubStore(counts = mapOf(OutboxStatus.PENDING to 5L)) + val registry = SimpleMeterRegistry() + val failingRunner = object : TransactionRunner { + override fun runInTransaction(block: () -> T): T = throw IllegalStateException("no tx manager") + } + MicrometerOutboxMetrics(store, registry, transactionRunner = failingRunner) + + val pendingGauge = registry.find("okapi.entries.count").tag("status", "pending").gauge() + pendingGauge!!.value().isNaN() shouldBe true + } +}) From 01840cfa72283b9f791fe12e343a5d918db67cc3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Wed, 8 Apr 2026 12:09:24 +0200 Subject: [PATCH 05/11] feat(spring-boot): autoconfigure Micrometer listener and gauges (KOJAK-44) Add MicrometerConfiguration inner class that creates MicrometerOutboxListener and MicrometerOutboxMetrics beans when MeterRegistry is on the classpath. OutboxProcessor bean now accepts an optional OutboxProcessorListener. --- .../okapi/core/OutboxProcessorTest.kt | 35 +++++++++++++++++ okapi-spring-boot/build.gradle.kts | 4 ++ .../springboot/OutboxAutoConfiguration.kt | 39 ++++++++++++++++++- .../OutboxProcessorAutoConfigurationTest.kt | 13 +++++++ 4 files changed, 90 insertions(+), 1 deletion(-) diff --git a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxProcessorTest.kt b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxProcessorTest.kt index 9dbdd11..ffed0b4 100644 --- a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxProcessorTest.kt +++ b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxProcessorTest.kt @@ -275,4 +275,39 @@ class OutboxProcessorTest : } } } + + given("processNext() with a listener — RetriableFailure exhausts retries") { + val events = mutableListOf() + + val listener = object : OutboxProcessorListener { + override fun onEntryProcessed(event: OutboxProcessingEvent) { + events += event + } + } + + `when`("retries are exhausted (maxRetries=0)") { + pendingEntries = listOf(stubEntry()) + val entryProcessor = OutboxEntryProcessor( + deliverer = stubDeliverer(DeliveryResult.RetriableFailure("still failing")), + retryPolicy = RetryPolicy(maxRetries = 0), + clock = fixedClock, + ) + OutboxProcessor(store, entryProcessor, listener = listener, clock = fixedClock) + .processNext() + val capturedEvents = events.toList() + val capturedProcessed = processedEntries.toList() + + then("entry is marked FAILED") { + capturedProcessed.first().status shouldBe OutboxStatus.FAILED + } + then("listener receives Failed event, not Retried") { + capturedEvents.size shouldBe 1 + capturedEvents.first() shouldBe OutboxProcessingEvent.Failed( + entry = capturedProcessed.first(), + duration = Duration.ZERO, + error = "still failing", + ) + } + } + } }) diff --git a/okapi-spring-boot/build.gradle.kts b/okapi-spring-boot/build.gradle.kts index 5512268..395d4c2 100644 --- a/okapi-spring-boot/build.gradle.kts +++ b/okapi-spring-boot/build.gradle.kts @@ -19,6 +19,8 @@ dependencies { compileOnly(project(":okapi-postgres")) compileOnly(project(":okapi-mysql")) compileOnly(libs.liquibaseCore) + compileOnly(project(":okapi-micrometer")) + compileOnly(libs.micrometerCore) testImplementation(libs.kotestRunnerJunit5) testImplementation(libs.kotestAssertionsCore) @@ -41,6 +43,8 @@ dependencies { testImplementation(libs.testcontainersMysql) testImplementation(libs.mysql) testImplementation(libs.wiremock) + testImplementation(project(":okapi-micrometer")) + testImplementation(libs.micrometerCore) } // CI version override: ./gradlew :okapi-spring-boot:test -PspringBootVersion=4.0.4 -PspringVersion=7.0.6 diff --git a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt index 53268d8..8d76e0f 100644 --- a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt +++ b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt @@ -4,13 +4,17 @@ import com.softwaremill.okapi.core.CompositeMessageDeliverer import com.softwaremill.okapi.core.MessageDeliverer import com.softwaremill.okapi.core.OutboxEntryProcessor import com.softwaremill.okapi.core.OutboxProcessor +import com.softwaremill.okapi.core.OutboxProcessorListener import com.softwaremill.okapi.core.OutboxPublisher import com.softwaremill.okapi.core.OutboxPurgerConfig import com.softwaremill.okapi.core.OutboxSchedulerConfig import com.softwaremill.okapi.core.OutboxStore import com.softwaremill.okapi.core.RetryPolicy +import com.softwaremill.okapi.micrometer.MicrometerOutboxListener +import com.softwaremill.okapi.micrometer.MicrometerOutboxMetrics import com.softwaremill.okapi.mysql.MysqlOutboxStore import com.softwaremill.okapi.postgres.PostgresOutboxStore +import io.micrometer.core.instrument.MeterRegistry import liquibase.integration.spring.SpringLiquibase import org.springframework.beans.factory.ObjectProvider import org.springframework.boot.autoconfigure.AutoConfiguration @@ -87,10 +91,15 @@ class OutboxAutoConfiguration( @Bean @ConditionalOnMissingBean - fun outboxProcessor(outboxStore: OutboxStore, outboxEntryProcessor: OutboxEntryProcessor): OutboxProcessor { + fun outboxProcessor( + outboxStore: OutboxStore, + outboxEntryProcessor: OutboxEntryProcessor, + listener: ObjectProvider, + ): OutboxProcessor { return OutboxProcessor( store = outboxStore, entryProcessor = outboxEntryProcessor, + listener = listener.getIfAvailable(), ) } @@ -181,6 +190,34 @@ class OutboxAutoConfiguration( } } + @Configuration(proxyBeanMethods = false) + @ConditionalOnClass(name = ["io.micrometer.core.instrument.MeterRegistry"]) + @ConditionalOnBean(MeterRegistry::class) + class MicrometerConfiguration { + @Bean + @ConditionalOnMissingBean + fun micrometerOutboxListener(registry: MeterRegistry): MicrometerOutboxListener = MicrometerOutboxListener(registry) + + @Bean + @ConditionalOnMissingBean + fun micrometerOutboxMetrics( + store: OutboxStore, + registry: MeterRegistry, + transactionManager: ObjectProvider, + clock: ObjectProvider, + ): MicrometerOutboxMetrics { + val readOnlyRunner = transactionManager.getIfAvailable()?.let { tm -> + SpringTransactionRunner(TransactionTemplate(tm).apply { isReadOnly = true }) + } + return MicrometerOutboxMetrics( + store = store, + registry = registry, + transactionRunner = readOnlyRunner, + clock = clock.getIfAvailable { Clock.systemUTC() }, + ) + } + } + companion object { internal fun resolveDataSource( dataSources: Map, diff --git a/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxProcessorAutoConfigurationTest.kt b/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxProcessorAutoConfigurationTest.kt index 713755c..ade1f17 100644 --- a/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxProcessorAutoConfigurationTest.kt +++ b/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxProcessorAutoConfigurationTest.kt @@ -5,6 +5,8 @@ import com.softwaremill.okapi.core.MessageDeliverer import com.softwaremill.okapi.core.OutboxEntry import com.softwaremill.okapi.core.OutboxStatus import com.softwaremill.okapi.core.OutboxStore +import com.softwaremill.okapi.micrometer.MicrometerOutboxListener +import com.softwaremill.okapi.micrometer.MicrometerOutboxMetrics import io.kotest.core.spec.style.FunSpec import io.kotest.matchers.nulls.shouldNotBeNull import io.kotest.matchers.shouldBe @@ -85,6 +87,17 @@ class OutboxProcessorAutoConfigurationTest : FunSpec({ callbackInvoked shouldBe true } } + + test("listener is wired into processor when MeterRegistry is present") { + contextRunner + .withBean(io.micrometer.core.instrument.MeterRegistry::class.java, { + io.micrometer.core.instrument.simple.SimpleMeterRegistry() + }) + .run { ctx -> + ctx.getBean(MicrometerOutboxListener::class.java).shouldNotBeNull() + ctx.getBean(MicrometerOutboxMetrics::class.java).shouldNotBeNull() + } + } }) private fun stubStore() = object : OutboxStore { From 0d9b7ec0dfa5078e1e85111f223c9f60ff0b6199 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Tue, 14 Apr 2026 10:13:17 +0200 Subject: [PATCH 06/11] =?UTF-8?q?refactor(core):=20rename=20Retried=20?= =?UTF-8?q?=E2=86=92=20RetryScheduled=20(KOJAK-44)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit "Retried" (past tense) implied the retry already happened, but the event is emitted when a failed delivery attempt is rescheduled for another try — even on the very first attempt. "RetryScheduled" is semantically accurate regardless of the attempt number. Renamed across: sealed event, OutboxProcessor mapping, MicrometerOutboxListener counter (okapi.entries.retried → okapi.entries.retry_scheduled), and all tests. --- .../okapi/core/OutboxProcessingEvent.kt | 19 +++++++++++++++++++ .../okapi/core/OutboxProcessor.kt | 2 +- .../okapi/core/OutboxProcessorTest.kt | 6 +++--- .../micrometer/MicrometerOutboxListener.kt | 8 ++++---- .../MicrometerOutboxListenerTest.kt | 8 ++++---- 5 files changed, 31 insertions(+), 12 deletions(-) create mode 100644 okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxProcessingEvent.kt diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxProcessingEvent.kt b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxProcessingEvent.kt new file mode 100644 index 0000000..8970df9 --- /dev/null +++ b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxProcessingEvent.kt @@ -0,0 +1,19 @@ +package com.softwaremill.okapi.core + +import java.time.Duration + +/** + * Outcome of processing a single [OutboxEntry], emitted by [OutboxProcessor] + * to [OutboxProcessorListener]. + * + * Sealed hierarchy enables exhaustive `when` in Kotlin — the compiler warns + * if a new subtype is added and a consumer does not handle it. + */ +sealed interface OutboxProcessingEvent { + val entry: OutboxEntry + val duration: Duration + + data class Delivered(override val entry: OutboxEntry, override val duration: Duration) : OutboxProcessingEvent + data class RetryScheduled(override val entry: OutboxEntry, override val duration: Duration, val error: String) : OutboxProcessingEvent + data class Failed(override val entry: OutboxEntry, override val duration: Duration, val error: String) : OutboxProcessingEvent +} diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxProcessor.kt b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxProcessor.kt index a9e4fb0..08c2568 100644 --- a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxProcessor.kt +++ b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxProcessor.kt @@ -36,7 +36,7 @@ class OutboxProcessor( try { val event = when (updated.status) { OutboxStatus.DELIVERED -> OutboxProcessingEvent.Delivered(updated, duration) - OutboxStatus.PENDING -> OutboxProcessingEvent.Retried(updated, duration, updated.lastError ?: "") + OutboxStatus.PENDING -> OutboxProcessingEvent.RetryScheduled(updated, duration, updated.lastError ?: "") OutboxStatus.FAILED -> OutboxProcessingEvent.Failed(updated, duration, updated.lastError ?: "") } listener.onEntryProcessed(event) diff --git a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxProcessorTest.kt b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxProcessorTest.kt index ffed0b4..167b6a5 100644 --- a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxProcessorTest.kt +++ b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxProcessorTest.kt @@ -187,9 +187,9 @@ class OutboxProcessorTest : val capturedEvents = events.toList() val capturedProcessed = processedEntries.toList() - then("listener receives Retried event with error") { + then("listener receives RetryScheduled event with error") { capturedEvents.size shouldBe 1 - capturedEvents.first() shouldBe OutboxProcessingEvent.Retried( + capturedEvents.first() shouldBe OutboxProcessingEvent.RetryScheduled( entry = capturedProcessed.first(), duration = Duration.ZERO, error = "timeout", @@ -300,7 +300,7 @@ class OutboxProcessorTest : then("entry is marked FAILED") { capturedProcessed.first().status shouldBe OutboxStatus.FAILED } - then("listener receives Failed event, not Retried") { + then("listener receives Failed event, not RetryScheduled") { capturedEvents.size shouldBe 1 capturedEvents.first() shouldBe OutboxProcessingEvent.Failed( entry = capturedProcessed.first(), diff --git a/okapi-micrometer/src/main/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxListener.kt b/okapi-micrometer/src/main/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxListener.kt index 6a3b6d9..3a3c81c 100644 --- a/okapi-micrometer/src/main/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxListener.kt +++ b/okapi-micrometer/src/main/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxListener.kt @@ -3,7 +3,7 @@ package com.softwaremill.okapi.micrometer import com.softwaremill.okapi.core.OutboxProcessingEvent import com.softwaremill.okapi.core.OutboxProcessingEvent.Delivered import com.softwaremill.okapi.core.OutboxProcessingEvent.Failed -import com.softwaremill.okapi.core.OutboxProcessingEvent.Retried +import com.softwaremill.okapi.core.OutboxProcessingEvent.RetryScheduled import com.softwaremill.okapi.core.OutboxProcessorListener import io.micrometer.core.instrument.Counter import io.micrometer.core.instrument.MeterRegistry @@ -16,19 +16,19 @@ import java.time.Duration * * Registered metrics: * - `okapi.entries.delivered` — counter - * - `okapi.entries.retried` — counter + * - `okapi.entries.retry_scheduled` — counter * - `okapi.entries.failed` — counter * - `okapi.batch.duration` — timer */ class MicrometerOutboxListener(registry: MeterRegistry) : OutboxProcessorListener { private val deliveredCounter = Counter.builder("okapi.entries.delivered").register(registry) - private val retriedCounter = Counter.builder("okapi.entries.retried").register(registry) + private val retryScheduledCounter = Counter.builder("okapi.entries.retry_scheduled").register(registry) private val failedCounter = Counter.builder("okapi.entries.failed").register(registry) private val batchTimer = Timer.builder("okapi.batch.duration").register(registry) override fun onEntryProcessed(event: OutboxProcessingEvent) = when (event) { is Delivered -> deliveredCounter.increment() - is Retried -> retriedCounter.increment() + is RetryScheduled -> retryScheduledCounter.increment() is Failed -> failedCounter.increment() } diff --git a/okapi-micrometer/src/test/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxListenerTest.kt b/okapi-micrometer/src/test/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxListenerTest.kt index 0aaa7ce..a961872 100644 --- a/okapi-micrometer/src/test/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxListenerTest.kt +++ b/okapi-micrometer/src/test/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxListenerTest.kt @@ -29,18 +29,18 @@ class MicrometerOutboxListenerTest : FunSpec({ listener.onEntryProcessed(OutboxProcessingEvent.Delivered(entry, Duration.ofMillis(42))) registry.counter("okapi.entries.delivered").count() shouldBe 1.0 - registry.counter("okapi.entries.retried").count() shouldBe 0.0 + registry.counter("okapi.entries.retry_scheduled").count() shouldBe 0.0 registry.counter("okapi.entries.failed").count() shouldBe 0.0 } - test("Retried event increments retried counter") { + test("RetryScheduled event increments retry_scheduled counter") { val registry = SimpleMeterRegistry() val listener = MicrometerOutboxListener(registry) val entry = stubEntry().retry(Instant.EPOCH, "timeout") - listener.onEntryProcessed(OutboxProcessingEvent.Retried(entry, Duration.ofMillis(10), "timeout")) + listener.onEntryProcessed(OutboxProcessingEvent.RetryScheduled(entry, Duration.ofMillis(10), "timeout")) - registry.counter("okapi.entries.retried").count() shouldBe 1.0 + registry.counter("okapi.entries.retry_scheduled").count() shouldBe 1.0 } test("Failed event increments failed counter") { From 9ac04989ad309368e1875d8627930635621fa856 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Tue, 14 Apr 2026 10:50:17 +0200 Subject: [PATCH 07/11] fix: wire Clock into outboxProcessor bean + measure delivery time only (KOJAK-44) outboxProcessor bean now injects ObjectProvider, consistent with all other beans in OutboxAutoConfiguration. Previously it silently fell back to Clock.systemUTC() even when a custom Clock bean was present. Per-entry duration now captures only the delivery attempt time (entryProcessor.process), excluding store.updateAfterProcessing(). This prevents DB write latency from inflating delivery metrics. --- .../main/kotlin/com/softwaremill/okapi/core/OutboxProcessor.kt | 3 ++- .../softwaremill/okapi/springboot/OutboxAutoConfiguration.kt | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxProcessor.kt b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxProcessor.kt index 08c2568..2249cb6 100644 --- a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxProcessor.kt +++ b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxProcessor.kt @@ -24,9 +24,10 @@ class OutboxProcessor( store.claimPending(limit).forEach { entry -> val entryStart = clock.instant() val updated = entryProcessor.process(entry) + val deliveryDuration = Duration.between(entryStart, clock.instant()) store.updateAfterProcessing(updated) count++ - notifyEntry(updated, Duration.between(entryStart, clock.instant())) + notifyEntry(updated, deliveryDuration) } notifyBatch(count, Duration.between(batchStart, clock.instant())) } diff --git a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt index 8d76e0f..40b951d 100644 --- a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt +++ b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt @@ -95,11 +95,13 @@ class OutboxAutoConfiguration( outboxStore: OutboxStore, outboxEntryProcessor: OutboxEntryProcessor, listener: ObjectProvider, + clock: ObjectProvider, ): OutboxProcessor { return OutboxProcessor( store = outboxStore, entryProcessor = outboxEntryProcessor, listener = listener.getIfAvailable(), + clock = clock.getIfAvailable { Clock.systemUTC() }, ) } From 862bb58d37e27a8f3c8d0a2a4431584ddec9dbe8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Tue, 14 Apr 2026 11:44:34 +0200 Subject: [PATCH 08/11] test(integration): add observability E2E tests on live Postgres + WireMock (KOJAK-44) Verifies the full observability pipeline against real infrastructure: - Retry-then-succeed: RetryScheduled counter + Delivered counter + gauges - Permanent failure: Failed counter + gauge reflects FAILED status - Batch duration: timer records realistic HTTP delivery time (50ms stub) - Lag gauge: reflects real time difference for pending entries in Postgres --- okapi-integration-tests/build.gradle.kts | 4 + .../test/e2e/ObservabilityEndToEndTest.kt | 176 ++++++++++++++++++ 2 files changed, 180 insertions(+) create mode 100644 okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/ObservabilityEndToEndTest.kt diff --git a/okapi-integration-tests/build.gradle.kts b/okapi-integration-tests/build.gradle.kts index 80002e1..7860b89 100644 --- a/okapi-integration-tests/build.gradle.kts +++ b/okapi-integration-tests/build.gradle.kts @@ -10,6 +10,7 @@ dependencies { testImplementation(project(":okapi-kafka")) testImplementation(project(":okapi-http")) testImplementation(project(":okapi-spring-boot")) + testImplementation(project(":okapi-micrometer")) // Test framework testImplementation(libs.kotestRunnerJunit5) @@ -43,6 +44,9 @@ dependencies { // WireMock (HTTP E2E tests) testImplementation(libs.wiremock) + // Micrometer (observability E2E tests) + testImplementation(libs.micrometerCore) + // Spring (for E2E tests that may need Spring context) testImplementation(libs.springContext) testImplementation(libs.springTx) diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/ObservabilityEndToEndTest.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/ObservabilityEndToEndTest.kt new file mode 100644 index 0000000..3b63795 --- /dev/null +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/ObservabilityEndToEndTest.kt @@ -0,0 +1,176 @@ +package com.softwaremill.okapi.test.e2e + +import com.github.tomakehurst.wiremock.WireMockServer +import com.github.tomakehurst.wiremock.client.WireMock.aResponse +import com.github.tomakehurst.wiremock.client.WireMock.post +import com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo +import com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig +import com.softwaremill.okapi.core.OutboxEntryProcessor +import com.softwaremill.okapi.core.OutboxMessage +import com.softwaremill.okapi.core.OutboxProcessor +import com.softwaremill.okapi.core.OutboxPublisher +import com.softwaremill.okapi.core.RetryPolicy +import com.softwaremill.okapi.core.TransactionRunner +import com.softwaremill.okapi.http.HttpMessageDeliverer +import com.softwaremill.okapi.http.ServiceUrlResolver +import com.softwaremill.okapi.http.httpDeliveryInfo +import com.softwaremill.okapi.micrometer.MicrometerOutboxListener +import com.softwaremill.okapi.micrometer.MicrometerOutboxMetrics +import com.softwaremill.okapi.postgres.PostgresOutboxStore +import com.softwaremill.okapi.test.support.PostgresTestSupport +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.doubles.shouldBeGreaterThanOrEqual +import io.kotest.matchers.shouldBe +import io.micrometer.core.instrument.simple.SimpleMeterRegistry +import org.jetbrains.exposed.v1.jdbc.transactions.transaction +import java.time.Clock +import java.util.concurrent.TimeUnit + +class ObservabilityEndToEndTest : FunSpec({ + val db = PostgresTestSupport() + val wiremock = WireMockServer(wireMockConfig().dynamicPort()) + val clock = Clock.systemUTC() + + val exposedTransactionRunner = object : TransactionRunner { + override fun runInTransaction(block: () -> T): T = transaction { block() } + } + + beforeSpec { + db.start() + wiremock.start() + } + + afterSpec { + wiremock.stop() + db.stop() + } + + beforeEach { + wiremock.resetAll() + db.truncate() + } + + fun deliveryInfo() = httpDeliveryInfo { + serviceName = "test-service" + endpointPath = "/api/webhook" + } + + test("full pipeline: publish, deliver, verify Micrometer counters and gauges") { + val registry = SimpleMeterRegistry() + val store = PostgresOutboxStore(clock) + val publisher = OutboxPublisher(store, clock) + val listener = MicrometerOutboxListener(registry) + MicrometerOutboxMetrics(store, registry, transactionRunner = exposedTransactionRunner, clock = clock) + + val urlResolver = ServiceUrlResolver { "http://localhost:${wiremock.port()}" } + val entryProcessor = OutboxEntryProcessor(HttpMessageDeliverer(urlResolver), RetryPolicy(maxRetries = 3), clock) + val processor = OutboxProcessor(store, entryProcessor, listener = listener, clock = clock) + + // Stub: first call → 500 (retriable), second call → 200 (success) + wiremock.stubFor( + post(urlEqualTo("/api/webhook")) + .inScenario("retry-then-succeed") + .whenScenarioStateIs("Started") + .willReturn(aResponse().withStatus(500)) + .willSetStateTo("first-failed"), + ) + wiremock.stubFor( + post(urlEqualTo("/api/webhook")) + .inScenario("retry-then-succeed") + .whenScenarioStateIs("first-failed") + .willReturn(aResponse().withStatus(200)), + ) + + // Publish 1 message + transaction { publisher.publish(OutboxMessage("order.created", """{"orderId":"e2e-1"}"""), deliveryInfo()) } + + // First processNext: HTTP 500 → RetryScheduled + transaction { processor.processNext() } + + registry.counter("okapi.entries.retry_scheduled").count() shouldBe 1.0 + registry.counter("okapi.entries.delivered").count() shouldBe 0.0 + registry.counter("okapi.entries.failed").count() shouldBe 0.0 + registry.timer("okapi.batch.duration").count() shouldBe 1 + + // Gauge: 1 PENDING entry + val pendingGauge = registry.find("okapi.entries.count").tag("status", "pending").gauge() + pendingGauge!!.value() shouldBe 1.0 + + // Second processNext: HTTP 200 → Delivered + transaction { processor.processNext() } + + registry.counter("okapi.entries.delivered").count() shouldBe 1.0 + registry.counter("okapi.entries.retry_scheduled").count() shouldBe 1.0 // still 1 from before + registry.timer("okapi.batch.duration").count() shouldBe 2 + + // Gauge: 0 PENDING, 1 DELIVERED + pendingGauge.value() shouldBe 0.0 + val deliveredGauge = registry.find("okapi.entries.count").tag("status", "delivered").gauge() + deliveredGauge!!.value() shouldBe 1.0 + } + + test("permanent failure: HTTP 400 → Failed counter incremented, gauge reflects FAILED") { + val registry = SimpleMeterRegistry() + val store = PostgresOutboxStore(clock) + val publisher = OutboxPublisher(store, clock) + val listener = MicrometerOutboxListener(registry) + MicrometerOutboxMetrics(store, registry, transactionRunner = exposedTransactionRunner, clock = clock) + + val urlResolver = ServiceUrlResolver { "http://localhost:${wiremock.port()}" } + val entryProcessor = OutboxEntryProcessor(HttpMessageDeliverer(urlResolver), RetryPolicy(maxRetries = 3), clock) + val processor = OutboxProcessor(store, entryProcessor, listener = listener, clock = clock) + + wiremock.stubFor( + post(urlEqualTo("/api/webhook")) + .willReturn(aResponse().withStatus(400)), + ) + + transaction { publisher.publish(OutboxMessage("order.created", """{"orderId":"e2e-2"}"""), deliveryInfo()) } + transaction { processor.processNext() } + + registry.counter("okapi.entries.failed").count() shouldBe 1.0 + registry.counter("okapi.entries.delivered").count() shouldBe 0.0 + + val failedGauge = registry.find("okapi.entries.count").tag("status", "failed").gauge() + failedGauge!!.value() shouldBe 1.0 + } + + test("batch duration timer records realistic delivery time") { + val registry = SimpleMeterRegistry() + val store = PostgresOutboxStore(clock) + val publisher = OutboxPublisher(store, clock) + val listener = MicrometerOutboxListener(registry) + + val urlResolver = ServiceUrlResolver { "http://localhost:${wiremock.port()}" } + val entryProcessor = OutboxEntryProcessor(HttpMessageDeliverer(urlResolver), RetryPolicy(maxRetries = 3), clock) + val processor = OutboxProcessor(store, entryProcessor, listener = listener, clock = clock) + + wiremock.stubFor( + post(urlEqualTo("/api/webhook")) + .willReturn(aResponse().withStatus(200).withFixedDelay(50)), + ) + + transaction { publisher.publish(OutboxMessage("order.created", """{"orderId":"e2e-3"}"""), deliveryInfo()) } + transaction { processor.processNext() } + + val timer = registry.timer("okapi.batch.duration") + timer.count() shouldBe 1 + timer.totalTime(TimeUnit.MILLISECONDS) shouldBeGreaterThanOrEqual 50.0 + } + + test("lag gauge reflects real time difference for pending entries") { + val registry = SimpleMeterRegistry() + val store = PostgresOutboxStore(clock) + val publisher = OutboxPublisher(store, clock) + MicrometerOutboxMetrics(store, registry, transactionRunner = exposedTransactionRunner, clock = clock) + + // Publish but don't process — entry stays PENDING + transaction { publisher.publish(OutboxMessage("order.created", """{"orderId":"e2e-4"}"""), deliveryInfo()) } + + // Small sleep to create measurable lag + Thread.sleep(100) + + val lagGauge = registry.find("okapi.entries.lag.seconds").tag("status", "pending").gauge() + lagGauge!!.value() shouldBeGreaterThanOrEqual 0.05 // at least 50ms lag + } +}) From 6c68326e5cdb1ba483edc31e394da3e55c0b4fe9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Wed, 15 Apr 2026 15:40:03 +0200 Subject: [PATCH 09/11] fix(spring-boot): extract OkapiMicrometerAutoConfiguration as top-level (KOJAK-44) Inner @Configuration classes inside @AutoConfiguration do not reliably see beans from other autoconfigurations via @ConditionalOnBean. This caused MicrometerConfiguration to never activate because MeterRegistry was not yet available when the condition was evaluated. Fix: extract to a separate top-level @AutoConfiguration with its own @AutoConfigureAfter targeting the correct Spring Boot 4 package (org.springframework.boot.micrometer.metrics.autoconfigure). --- .../okapi/core/OutboxProcessorListener.kt | 18 +++++++ .../OkapiMicrometerAutoConfiguration.kt | 54 +++++++++++++++++++ .../springboot/OutboxAutoConfiguration.kt | 31 ----------- ...ot.autoconfigure.AutoConfiguration.imports | 1 + .../OutboxProcessorAutoConfigurationTest.kt | 2 +- 5 files changed, 74 insertions(+), 32 deletions(-) create mode 100644 okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxProcessorListener.kt create mode 100644 okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OkapiMicrometerAutoConfiguration.kt diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxProcessorListener.kt b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxProcessorListener.kt new file mode 100644 index 0000000..10bdaf2 --- /dev/null +++ b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxProcessorListener.kt @@ -0,0 +1,18 @@ +package com.softwaremill.okapi.core + +import java.time.Duration + +/** + * Callback interface for observing [OutboxProcessor] activity. + * + * Default no-op implementations allow consumers to override only the + * methods they care about. Exceptions thrown by implementations are + * caught and logged — they never break processing. + */ +interface OutboxProcessorListener { + /** Called after each entry is processed (delivered, retried, or failed). */ + fun onEntryProcessed(event: OutboxProcessingEvent) {} + + /** Called after a full batch completes (even if empty). */ + fun onBatchProcessed(processedCount: Int, duration: Duration) {} +} diff --git a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OkapiMicrometerAutoConfiguration.kt b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OkapiMicrometerAutoConfiguration.kt new file mode 100644 index 0000000..9b463f9 --- /dev/null +++ b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OkapiMicrometerAutoConfiguration.kt @@ -0,0 +1,54 @@ +package com.softwaremill.okapi.springboot + +import com.softwaremill.okapi.core.OutboxStore +import com.softwaremill.okapi.micrometer.MicrometerOutboxListener +import com.softwaremill.okapi.micrometer.MicrometerOutboxMetrics +import io.micrometer.core.instrument.MeterRegistry +import org.springframework.beans.factory.ObjectProvider +import org.springframework.boot.autoconfigure.AutoConfiguration +import org.springframework.boot.autoconfigure.AutoConfigureAfter +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean +import org.springframework.context.annotation.Bean +import org.springframework.transaction.PlatformTransactionManager +import org.springframework.transaction.support.TransactionTemplate +import java.time.Clock + +/** + * Autoconfiguration for Okapi Micrometer observability beans. + * + * Separated from [OutboxAutoConfiguration] as a top-level autoconfiguration + * so that [ConditionalOnBean] for [MeterRegistry] evaluates after the meter + * registry is created by Spring Boot's metrics autoconfiguration. + */ +@AutoConfiguration +@AutoConfigureAfter( + name = ["org.springframework.boot.micrometer.metrics.autoconfigure.CompositeMeterRegistryAutoConfiguration"], +) +@ConditionalOnClass(name = ["io.micrometer.core.instrument.MeterRegistry"]) +@ConditionalOnBean(MeterRegistry::class) +class OkapiMicrometerAutoConfiguration { + @Bean + @ConditionalOnMissingBean + fun micrometerOutboxListener(registry: MeterRegistry): MicrometerOutboxListener = MicrometerOutboxListener(registry) + + @Bean + @ConditionalOnMissingBean + fun micrometerOutboxMetrics( + store: OutboxStore, + registry: MeterRegistry, + transactionManager: ObjectProvider, + clock: ObjectProvider, + ): MicrometerOutboxMetrics { + val readOnlyRunner = transactionManager.getIfAvailable()?.let { tm -> + SpringTransactionRunner(TransactionTemplate(tm).apply { isReadOnly = true }) + } + return MicrometerOutboxMetrics( + store = store, + registry = registry, + transactionRunner = readOnlyRunner, + clock = clock.getIfAvailable { Clock.systemUTC() }, + ) + } +} diff --git a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt index 40b951d..123d367 100644 --- a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt +++ b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt @@ -10,11 +10,8 @@ import com.softwaremill.okapi.core.OutboxPurgerConfig import com.softwaremill.okapi.core.OutboxSchedulerConfig import com.softwaremill.okapi.core.OutboxStore import com.softwaremill.okapi.core.RetryPolicy -import com.softwaremill.okapi.micrometer.MicrometerOutboxListener -import com.softwaremill.okapi.micrometer.MicrometerOutboxMetrics import com.softwaremill.okapi.mysql.MysqlOutboxStore import com.softwaremill.okapi.postgres.PostgresOutboxStore -import io.micrometer.core.instrument.MeterRegistry import liquibase.integration.spring.SpringLiquibase import org.springframework.beans.factory.ObjectProvider import org.springframework.boot.autoconfigure.AutoConfiguration @@ -192,34 +189,6 @@ class OutboxAutoConfiguration( } } - @Configuration(proxyBeanMethods = false) - @ConditionalOnClass(name = ["io.micrometer.core.instrument.MeterRegistry"]) - @ConditionalOnBean(MeterRegistry::class) - class MicrometerConfiguration { - @Bean - @ConditionalOnMissingBean - fun micrometerOutboxListener(registry: MeterRegistry): MicrometerOutboxListener = MicrometerOutboxListener(registry) - - @Bean - @ConditionalOnMissingBean - fun micrometerOutboxMetrics( - store: OutboxStore, - registry: MeterRegistry, - transactionManager: ObjectProvider, - clock: ObjectProvider, - ): MicrometerOutboxMetrics { - val readOnlyRunner = transactionManager.getIfAvailable()?.let { tm -> - SpringTransactionRunner(TransactionTemplate(tm).apply { isReadOnly = true }) - } - return MicrometerOutboxMetrics( - store = store, - registry = registry, - transactionRunner = readOnlyRunner, - clock = clock.getIfAvailable { Clock.systemUTC() }, - ) - } - } - companion object { internal fun resolveDataSource( dataSources: Map, diff --git a/okapi-spring-boot/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/okapi-spring-boot/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports index 8987c56..e19af6d 100644 --- a/okapi-spring-boot/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports +++ b/okapi-spring-boot/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -1 +1,2 @@ com.softwaremill.okapi.springboot.OutboxAutoConfiguration +com.softwaremill.okapi.springboot.OkapiMicrometerAutoConfiguration diff --git a/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxProcessorAutoConfigurationTest.kt b/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxProcessorAutoConfigurationTest.kt index ade1f17..31752f0 100644 --- a/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxProcessorAutoConfigurationTest.kt +++ b/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxProcessorAutoConfigurationTest.kt @@ -21,7 +21,7 @@ import javax.sql.DataSource class OutboxProcessorAutoConfigurationTest : FunSpec({ val contextRunner = ApplicationContextRunner() - .withConfiguration(AutoConfigurations.of(OutboxAutoConfiguration::class.java)) + .withConfiguration(AutoConfigurations.of(OutboxAutoConfiguration::class.java, OkapiMicrometerAutoConfiguration::class.java)) .withBean(OutboxStore::class.java, { stubStore() }) .withBean(MessageDeliverer::class.java, { stubDeliverer() }) .withBean(DataSource::class.java, { SimpleDriverDataSource() }) From ee4371f4af4503a661d1e79bf9a299e911f8d0a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Wed, 15 Apr 2026 15:41:21 +0200 Subject: [PATCH 10/11] docs: add okapi-micrometer to README modules and observability section (KOJAK-44) Add Observability section with metrics table and quick-start snippet. Update module diagram and table to include okapi-micrometer. --- README.md | 28 +++++++++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 6d33ece..b507d25 100644 --- a/README.md +++ b/README.md @@ -95,6 +95,29 @@ Okapi implements the [transactional outbox pattern](https://softwaremill.com/mic - **Concurrent processing** — multiple processors can run in parallel using `FOR UPDATE SKIP LOCKED`, so messages are never processed twice simultaneously. - **Delivery result classification** — each transport classifies errors as `Success`, `RetriableFailure`, or `PermanentFailure`. For example, HTTP 429 is retriable while HTTP 400 is permanent. +## Observability + +Add `okapi-micrometer` to get Micrometer metrics out of the box: + +```kotlin +implementation("com.softwaremill.okapi:okapi-micrometer") +``` + +With Spring Boot Actuator, metrics appear automatically on `/actuator/prometheus`: + +| Metric | Type | Description | +|--------|------|-------------| +| `okapi.entries.delivered` | Counter | Successfully delivered entries | +| `okapi.entries.retry_scheduled` | Counter | Failed attempts rescheduled for retry | +| `okapi.entries.failed` | Counter | Permanently failed entries | +| `okapi.batch.duration` | Timer | Processing time per batch | +| `okapi.entries.count` | Gauge | Current entry count per status | +| `okapi.entries.lag.seconds` | Gauge | Age of the oldest entry per status | + +**Without Spring Boot:** create `MicrometerOutboxListener` and `MicrometerOutboxMetrics` manually and pass a `MeterRegistry`. + +**Custom listener:** implement `OutboxProcessorListener` to react to delivery events (logging, alerting, custom metrics). + ## Modules ```mermaid @@ -103,9 +126,11 @@ graph BT MY[okapi-mysql] --> CORE HTTP[okapi-http] --> CORE KAFKA[okapi-kafka] --> CORE + MICRO[okapi-micrometer] --> CORE SPRING[okapi-spring-boot] --> CORE SPRING -.->|compileOnly| PG SPRING -.->|compileOnly| MY + SPRING -.->|compileOnly| MICRO BOM[okapi-bom] style CORE fill:#4a9eff,color:#fff @@ -119,7 +144,8 @@ graph BT | `okapi-mysql` | MySQL 8+ storage via Exposed ORM | | `okapi-http` | HTTP webhook delivery (JDK HttpClient) | | `okapi-kafka` | Kafka topic publishing | -| `okapi-spring-boot` | Spring Boot autoconfiguration (auto-detects store and transports) | +| `okapi-micrometer` | Micrometer metrics (counters, timers, gauges) | +| `okapi-spring-boot` | Spring Boot autoconfiguration (auto-detects store, transports, and metrics) | | `okapi-bom` | Bill of Materials for version alignment | ## Compatibility From 1d6cf40a962348cfe6a78f522d47dc46f510936c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Wed, 15 Apr 2026 16:37:32 +0200 Subject: [PATCH 11/11] fix: metric naming, README clarity, KDoc improvements (KOJAK-44) Rename okapi.entries.retry_scheduled to okapi.entries.retry.scheduled (dots-only follows Micrometer naming convention). Clarify README observability section, add tag names to gauge descriptions, document duration excludes DB write, single-listener note, autoconfig override. --- README.md | 16 ++++++++-------- .../okapi/core/OutboxProcessingEvent.kt | 5 ++--- .../okapi/core/OutboxProcessorListener.kt | 3 +++ .../okapi/test/e2e/ObservabilityEndToEndTest.kt | 4 ++-- .../okapi/micrometer/MicrometerOutboxListener.kt | 7 +++++-- .../okapi/micrometer/MicrometerOutboxMetrics.kt | 2 +- .../micrometer/MicrometerOutboxListenerTest.kt | 6 +++--- .../OkapiMicrometerAutoConfiguration.kt | 3 +++ 8 files changed, 27 insertions(+), 19 deletions(-) diff --git a/README.md b/README.md index b507d25..7861f4d 100644 --- a/README.md +++ b/README.md @@ -60,7 +60,7 @@ class OrderService( } ``` -Autoconfiguration handles scheduling, retries, and delivery automatically. +Autoconfiguration handles scheduling, retries, and delivery automatically. For Micrometer metrics, also add `okapi-micrometer` — see [Observability](#observability). **Using Kafka instead of HTTP?** Swap the deliverer bean and delivery info: @@ -97,26 +97,26 @@ Okapi implements the [transactional outbox pattern](https://softwaremill.com/mic ## Observability -Add `okapi-micrometer` to get Micrometer metrics out of the box: +Add `okapi-micrometer` alongside `okapi-spring-boot` (from the Quick Start above) to get Micrometer metrics: ```kotlin implementation("com.softwaremill.okapi:okapi-micrometer") ``` -With Spring Boot Actuator, metrics appear automatically on `/actuator/prometheus`: +With Spring Boot Actuator and a Prometheus registry (`micrometer-registry-prometheus`) on the classpath, metrics are automatically exposed on `/actuator/prometheus`. They are also visible via `/actuator/metrics`. | Metric | Type | Description | |--------|------|-------------| | `okapi.entries.delivered` | Counter | Successfully delivered entries | -| `okapi.entries.retry_scheduled` | Counter | Failed attempts rescheduled for retry | +| `okapi.entries.retry.scheduled` | Counter | Failed attempts rescheduled for retry | | `okapi.entries.failed` | Counter | Permanently failed entries | | `okapi.batch.duration` | Timer | Processing time per batch | -| `okapi.entries.count` | Gauge | Current entry count per status | -| `okapi.entries.lag.seconds` | Gauge | Age of the oldest entry per status | +| `okapi.entries.count` | Gauge | Current entry count (tag: `status=pending\|delivered\|failed`) | +| `okapi.entries.lag.seconds` | Gauge | Age of oldest entry in seconds (tag: `status`) | -**Without Spring Boot:** create `MicrometerOutboxListener` and `MicrometerOutboxMetrics` manually and pass a `MeterRegistry`. +**Without Spring Boot:** create `MicrometerOutboxListener` and `MicrometerOutboxMetrics` manually and pass a `MeterRegistry`. `MicrometerOutboxMetrics` requires a `TransactionRunner` for Exposed-backed stores — see the class KDoc for details. -**Custom listener:** implement `OutboxProcessorListener` to react to delivery events (logging, alerting, custom metrics). +**Custom listener:** implement `OutboxProcessorListener` to react to delivery events (logging, alerting, custom metrics). `OutboxProcessor` accepts a single listener; to combine multiple, implement a composite that delegates to each. ## Modules diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxProcessingEvent.kt b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxProcessingEvent.kt index 8970df9..e3ce91a 100644 --- a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxProcessingEvent.kt +++ b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxProcessingEvent.kt @@ -5,12 +5,11 @@ import java.time.Duration /** * Outcome of processing a single [OutboxEntry], emitted by [OutboxProcessor] * to [OutboxProcessorListener]. - * - * Sealed hierarchy enables exhaustive `when` in Kotlin — the compiler warns - * if a new subtype is added and a consumer does not handle it. */ sealed interface OutboxProcessingEvent { val entry: OutboxEntry + + /** Wall-clock duration of the delivery attempt, excluding the database update. */ val duration: Duration data class Delivered(override val entry: OutboxEntry, override val duration: Duration) : OutboxProcessingEvent diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxProcessorListener.kt b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxProcessorListener.kt index 10bdaf2..65fa38c 100644 --- a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxProcessorListener.kt +++ b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxProcessorListener.kt @@ -8,6 +8,9 @@ import java.time.Duration * Default no-op implementations allow consumers to override only the * methods they care about. Exceptions thrown by implementations are * caught and logged — they never break processing. + * + * [OutboxProcessor] accepts a single listener. To combine multiple listeners, + * implement a composite that delegates to each. */ interface OutboxProcessorListener { /** Called after each entry is processed (delivered, retried, or failed). */ diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/ObservabilityEndToEndTest.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/ObservabilityEndToEndTest.kt index 3b63795..518000e 100644 --- a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/ObservabilityEndToEndTest.kt +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/ObservabilityEndToEndTest.kt @@ -87,7 +87,7 @@ class ObservabilityEndToEndTest : FunSpec({ // First processNext: HTTP 500 → RetryScheduled transaction { processor.processNext() } - registry.counter("okapi.entries.retry_scheduled").count() shouldBe 1.0 + registry.counter("okapi.entries.retry.scheduled").count() shouldBe 1.0 registry.counter("okapi.entries.delivered").count() shouldBe 0.0 registry.counter("okapi.entries.failed").count() shouldBe 0.0 registry.timer("okapi.batch.duration").count() shouldBe 1 @@ -100,7 +100,7 @@ class ObservabilityEndToEndTest : FunSpec({ transaction { processor.processNext() } registry.counter("okapi.entries.delivered").count() shouldBe 1.0 - registry.counter("okapi.entries.retry_scheduled").count() shouldBe 1.0 // still 1 from before + registry.counter("okapi.entries.retry.scheduled").count() shouldBe 1.0 // still 1 from before registry.timer("okapi.batch.duration").count() shouldBe 2 // Gauge: 0 PENDING, 1 DELIVERED diff --git a/okapi-micrometer/src/main/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxListener.kt b/okapi-micrometer/src/main/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxListener.kt index 3a3c81c..63595dc 100644 --- a/okapi-micrometer/src/main/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxListener.kt +++ b/okapi-micrometer/src/main/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxListener.kt @@ -16,13 +16,16 @@ import java.time.Duration * * Registered metrics: * - `okapi.entries.delivered` — counter - * - `okapi.entries.retry_scheduled` — counter + * - `okapi.entries.retry.scheduled` — counter * - `okapi.entries.failed` — counter * - `okapi.batch.duration` — timer + * + * Note: `processedCount` from [onBatchProcessed] is not recorded as a separate metric; + * per-entry counters provide sufficient granularity. */ class MicrometerOutboxListener(registry: MeterRegistry) : OutboxProcessorListener { private val deliveredCounter = Counter.builder("okapi.entries.delivered").register(registry) - private val retryScheduledCounter = Counter.builder("okapi.entries.retry_scheduled").register(registry) + private val retryScheduledCounter = Counter.builder("okapi.entries.retry.scheduled").register(registry) private val failedCounter = Counter.builder("okapi.entries.failed").register(registry) private val batchTimer = Timer.builder("okapi.batch.duration").register(registry) diff --git a/okapi-micrometer/src/main/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxMetrics.kt b/okapi-micrometer/src/main/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxMetrics.kt index b23edca..9414980 100644 --- a/okapi-micrometer/src/main/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxMetrics.kt +++ b/okapi-micrometer/src/main/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxMetrics.kt @@ -17,7 +17,7 @@ import java.time.Duration * - `okapi.entries.lag.seconds` (tag: status) — age of the oldest entry per status * * Gauge suppliers run on the Prometheus scrape thread, which has no ambient transaction. - * Store implementations backed by Exposed (e.g. [com.softwaremill.okapi.postgres.PostgresOutboxStore]) + * Store implementations backed by Exposed (e.g. `PostgresOutboxStore`, `MysqlOutboxStore`) * require an active transaction on the calling thread, so a [TransactionRunner] must be supplied * when using such stores. A read-only [TransactionRunner] is recommended. * diff --git a/okapi-micrometer/src/test/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxListenerTest.kt b/okapi-micrometer/src/test/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxListenerTest.kt index a961872..6b82d49 100644 --- a/okapi-micrometer/src/test/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxListenerTest.kt +++ b/okapi-micrometer/src/test/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxListenerTest.kt @@ -29,18 +29,18 @@ class MicrometerOutboxListenerTest : FunSpec({ listener.onEntryProcessed(OutboxProcessingEvent.Delivered(entry, Duration.ofMillis(42))) registry.counter("okapi.entries.delivered").count() shouldBe 1.0 - registry.counter("okapi.entries.retry_scheduled").count() shouldBe 0.0 + registry.counter("okapi.entries.retry.scheduled").count() shouldBe 0.0 registry.counter("okapi.entries.failed").count() shouldBe 0.0 } - test("RetryScheduled event increments retry_scheduled counter") { + test("RetryScheduled event increments retry.scheduled counter") { val registry = SimpleMeterRegistry() val listener = MicrometerOutboxListener(registry) val entry = stubEntry().retry(Instant.EPOCH, "timeout") listener.onEntryProcessed(OutboxProcessingEvent.RetryScheduled(entry, Duration.ofMillis(10), "timeout")) - registry.counter("okapi.entries.retry_scheduled").count() shouldBe 1.0 + registry.counter("okapi.entries.retry.scheduled").count() shouldBe 1.0 } test("Failed event increments failed counter") { diff --git a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OkapiMicrometerAutoConfiguration.kt b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OkapiMicrometerAutoConfiguration.kt index 9b463f9..f8c7c54 100644 --- a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OkapiMicrometerAutoConfiguration.kt +++ b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OkapiMicrometerAutoConfiguration.kt @@ -21,6 +21,9 @@ import java.time.Clock * Separated from [OutboxAutoConfiguration] as a top-level autoconfiguration * so that [ConditionalOnBean] for [MeterRegistry] evaluates after the meter * registry is created by Spring Boot's metrics autoconfiguration. + * + * Both beans are `@ConditionalOnMissingBean` — define your own [MicrometerOutboxListener] + * or [MicrometerOutboxMetrics] bean to override the defaults. */ @AutoConfiguration @AutoConfigureAfter(