diff --git a/README.md b/README.md index 6d33ece..7861f4d 100644 --- a/README.md +++ b/README.md @@ -60,7 +60,7 @@ class OrderService( } ``` -Autoconfiguration handles scheduling, retries, and delivery automatically. +Autoconfiguration handles scheduling, retries, and delivery automatically. For Micrometer metrics, also add `okapi-micrometer` — see [Observability](#observability). **Using Kafka instead of HTTP?** Swap the deliverer bean and delivery info: @@ -95,6 +95,29 @@ Okapi implements the [transactional outbox pattern](https://softwaremill.com/mic - **Concurrent processing** — multiple processors can run in parallel using `FOR UPDATE SKIP LOCKED`, so messages are never processed twice simultaneously. - **Delivery result classification** — each transport classifies errors as `Success`, `RetriableFailure`, or `PermanentFailure`. For example, HTTP 429 is retriable while HTTP 400 is permanent. +## Observability + +Add `okapi-micrometer` alongside `okapi-spring-boot` (from the Quick Start above) to get Micrometer metrics: + +```kotlin +implementation("com.softwaremill.okapi:okapi-micrometer") +``` + +With Spring Boot Actuator and a Prometheus registry (`micrometer-registry-prometheus`) on the classpath, metrics are automatically exposed on `/actuator/prometheus`. They are also visible via `/actuator/metrics`. + +| Metric | Type | Description | +|--------|------|-------------| +| `okapi.entries.delivered` | Counter | Successfully delivered entries | +| `okapi.entries.retry.scheduled` | Counter | Failed attempts rescheduled for retry | +| `okapi.entries.failed` | Counter | Permanently failed entries | +| `okapi.batch.duration` | Timer | Processing time per batch | +| `okapi.entries.count` | Gauge | Current entry count (tag: `status=pending\|delivered\|failed`) | +| `okapi.entries.lag.seconds` | Gauge | Age of oldest entry in seconds (tag: `status`) | + +**Without Spring Boot:** create `MicrometerOutboxListener` and `MicrometerOutboxMetrics` manually and pass a `MeterRegistry`. `MicrometerOutboxMetrics` requires a `TransactionRunner` for Exposed-backed stores — see the class KDoc for details. + +**Custom listener:** implement `OutboxProcessorListener` to react to delivery events (logging, alerting, custom metrics). `OutboxProcessor` accepts a single listener; to combine multiple, implement a composite that delegates to each. + ## Modules ```mermaid @@ -103,9 +126,11 @@ graph BT MY[okapi-mysql] --> CORE HTTP[okapi-http] --> CORE KAFKA[okapi-kafka] --> CORE + MICRO[okapi-micrometer] --> CORE SPRING[okapi-spring-boot] --> CORE SPRING -.->|compileOnly| PG SPRING -.->|compileOnly| MY + SPRING -.->|compileOnly| MICRO BOM[okapi-bom] style CORE fill:#4a9eff,color:#fff @@ -119,7 +144,8 @@ graph BT | `okapi-mysql` | MySQL 8+ storage via Exposed ORM | | `okapi-http` | HTTP webhook delivery (JDK HttpClient) | | `okapi-kafka` | Kafka topic publishing | -| `okapi-spring-boot` | Spring Boot autoconfiguration (auto-detects store and transports) | +| `okapi-micrometer` | Micrometer metrics (counters, timers, gauges) | +| `okapi-spring-boot` | Spring Boot autoconfiguration (auto-detects store, transports, and metrics) | | `okapi-bom` | Bill of Materials for version alignment | ## Compatibility diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 61d40fe..dd3425e 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -16,6 +16,7 @@ wiremock = "3.13.2" slf4j = "2.0.17" assertj = "3.27.7" h2 = "2.4.240" +micrometer = "1.15.6" [libraries] kotlinGradlePlugin = { module = "org.jetbrains.kotlin:kotlin-gradle-plugin", version.ref = "kotlin" } @@ -44,6 +45,8 @@ springBootAutoconfigure = { module = "org.springframework.boot:spring-boot-autoc springBootStarterValidation = { module = "org.springframework.boot:spring-boot-starter-validation", version.ref = "springBoot" } springBootTest = { module = "org.springframework.boot:spring-boot-test", version.ref = "springBoot" } assertjCore = { module = "org.assertj:assertj-core", version.ref = "assertj" } +micrometerCore = { module = "io.micrometer:micrometer-core", version.ref = "micrometer" } +micrometerTest = { module = "io.micrometer:micrometer-test", version.ref = "micrometer" } wiremock = { module = "org.wiremock:wiremock", version.ref = "wiremock" } slf4jApi = { module = "org.slf4j:slf4j-api", version.ref = "slf4j" } slf4jSimple = { module = "org.slf4j:slf4j-simple", version.ref = "slf4j" } diff --git a/okapi-bom/build.gradle.kts b/okapi-bom/build.gradle.kts index 588ba56..03c6076 100644 --- a/okapi-bom/build.gradle.kts +++ b/okapi-bom/build.gradle.kts @@ -13,5 +13,6 @@ dependencies { api(project(":okapi-http")) api(project(":okapi-kafka")) api(project(":okapi-spring-boot")) + api(project(":okapi-micrometer")) } } diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxProcessingEvent.kt b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxProcessingEvent.kt new file mode 100644 index 0000000..e3ce91a --- /dev/null +++ b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxProcessingEvent.kt @@ -0,0 +1,18 @@ +package com.softwaremill.okapi.core + +import java.time.Duration + +/** + * Outcome of processing a single [OutboxEntry], emitted by [OutboxProcessor] + * to [OutboxProcessorListener]. + */ +sealed interface OutboxProcessingEvent { + val entry: OutboxEntry + + /** Wall-clock duration of the delivery attempt, excluding the database update. */ + val duration: Duration + + data class Delivered(override val entry: OutboxEntry, override val duration: Duration) : OutboxProcessingEvent + data class RetryScheduled(override val entry: OutboxEntry, override val duration: Duration, val error: String) : OutboxProcessingEvent + data class Failed(override val entry: OutboxEntry, override val duration: Duration, val error: String) : OutboxProcessingEvent +} diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxProcessor.kt b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxProcessor.kt index b29d6cf..2249cb6 100644 --- a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxProcessor.kt +++ b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxProcessor.kt @@ -1,19 +1,61 @@ package com.softwaremill.okapi.core +import org.slf4j.LoggerFactory +import java.time.Clock +import java.time.Duration + /** * Orchestrates a single processing cycle: claims pending entries from [OutboxStore], * delegates each to [OutboxEntryProcessor], and persists the result. * - * Transaction management is the caller's responsibility. + * An optional [OutboxProcessorListener] is notified after each entry and after the + * full batch. Exceptions in the listener are caught and logged — they never break + * processing. Transaction management is the caller's responsibility. */ class OutboxProcessor( private val store: OutboxStore, private val entryProcessor: OutboxEntryProcessor, + private val listener: OutboxProcessorListener? = null, + private val clock: Clock = Clock.systemUTC(), ) { fun processNext(limit: Int = 10) { + val batchStart = clock.instant() + var count = 0 store.claimPending(limit).forEach { entry -> + val entryStart = clock.instant() val updated = entryProcessor.process(entry) + val deliveryDuration = Duration.between(entryStart, clock.instant()) store.updateAfterProcessing(updated) + count++ + notifyEntry(updated, deliveryDuration) } + notifyBatch(count, Duration.between(batchStart, clock.instant())) + } + + private fun notifyEntry(updated: OutboxEntry, duration: Duration) { + if (listener == null) return + try { + val event = when (updated.status) { + OutboxStatus.DELIVERED -> OutboxProcessingEvent.Delivered(updated, duration) + OutboxStatus.PENDING -> OutboxProcessingEvent.RetryScheduled(updated, duration, updated.lastError ?: "") + OutboxStatus.FAILED -> OutboxProcessingEvent.Failed(updated, duration, updated.lastError ?: "") + } + listener.onEntryProcessed(event) + } catch (e: Exception) { + logger.warn("OutboxProcessorListener.onEntryProcessed failed", e) + } + } + + private fun notifyBatch(count: Int, duration: Duration) { + if (listener == null) return + try { + listener.onBatchProcessed(count, duration) + } catch (e: Exception) { + logger.warn("OutboxProcessorListener.onBatchProcessed failed", e) + } + } + + companion object { + private val logger = LoggerFactory.getLogger(OutboxProcessor::class.java) } } diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxProcessorListener.kt b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxProcessorListener.kt new file mode 100644 index 0000000..65fa38c --- /dev/null +++ b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxProcessorListener.kt @@ -0,0 +1,21 @@ +package com.softwaremill.okapi.core + +import java.time.Duration + +/** + * Callback interface for observing [OutboxProcessor] activity. + * + * Default no-op implementations allow consumers to override only the + * methods they care about. Exceptions thrown by implementations are + * caught and logged — they never break processing. + * + * [OutboxProcessor] accepts a single listener. To combine multiple listeners, + * implement a composite that delegates to each. + */ +interface OutboxProcessorListener { + /** Called after each entry is processed (delivered, retried, or failed). */ + fun onEntryProcessed(event: OutboxProcessingEvent) {} + + /** Called after a full batch completes (even if empty). */ + fun onBatchProcessed(processedCount: Int, duration: Duration) {} +} diff --git a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxProcessorTest.kt b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxProcessorTest.kt index 9d5bb1b..167b6a5 100644 --- a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxProcessorTest.kt +++ b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxProcessorTest.kt @@ -3,6 +3,7 @@ package com.softwaremill.okapi.core import io.kotest.core.spec.style.BehaviorSpec import io.kotest.matchers.shouldBe import java.time.Clock +import java.time.Duration import java.time.Instant import java.time.ZoneOffset @@ -123,4 +124,190 @@ class OutboxProcessorTest : } } } + + given("processNext() with a listener — delivery succeeds") { + val events = mutableListOf() + var batchCount: Int? = null + var batchDuration: Duration? = null + + val listener = object : OutboxProcessorListener { + override fun onEntryProcessed(event: OutboxProcessingEvent) { + events += event + } + override fun onBatchProcessed(processedCount: Int, duration: Duration) { + batchCount = processedCount + batchDuration = duration + } + } + + `when`("delivery succeeds") { + pendingEntries = listOf(stubEntry()) + val entryProcessor = OutboxEntryProcessor( + deliverer = stubDeliverer(DeliveryResult.Success), + retryPolicy = RetryPolicy(maxRetries = 3), + clock = fixedClock, + ) + OutboxProcessor(store, entryProcessor, listener = listener, clock = fixedClock) + .processNext() + val capturedEvents = events.toList() + val capturedProcessed = processedEntries.toList() + + then("listener receives Delivered event") { + capturedEvents.size shouldBe 1 + capturedEvents.first() shouldBe OutboxProcessingEvent.Delivered( + entry = capturedProcessed.first(), + duration = Duration.ZERO, + ) + } + then("batch callback is invoked with count=1") { + batchCount shouldBe 1 + batchDuration shouldBe Duration.ZERO + } + } + } + + given("processNext() with a listener — delivery returns RetriableFailure") { + val events = mutableListOf() + + val listener = object : OutboxProcessorListener { + override fun onEntryProcessed(event: OutboxProcessingEvent) { + events += event + } + } + + `when`("retries remain") { + pendingEntries = listOf(stubEntry()) + val entryProcessor = OutboxEntryProcessor( + deliverer = stubDeliverer(DeliveryResult.RetriableFailure("timeout")), + retryPolicy = RetryPolicy(maxRetries = 3), + clock = fixedClock, + ) + OutboxProcessor(store, entryProcessor, listener = listener, clock = fixedClock) + .processNext() + val capturedEvents = events.toList() + val capturedProcessed = processedEntries.toList() + + then("listener receives RetryScheduled event with error") { + capturedEvents.size shouldBe 1 + capturedEvents.first() shouldBe OutboxProcessingEvent.RetryScheduled( + entry = capturedProcessed.first(), + duration = Duration.ZERO, + error = "timeout", + ) + } + } + } + + given("processNext() with a listener — delivery returns PermanentFailure") { + val events = mutableListOf() + + val listener = object : OutboxProcessorListener { + override fun onEntryProcessed(event: OutboxProcessingEvent) { + events += event + } + } + + `when`("delivery returns PermanentFailure") { + pendingEntries = listOf(stubEntry()) + val entryProcessor = OutboxEntryProcessor( + deliverer = stubDeliverer(DeliveryResult.PermanentFailure("bad request")), + retryPolicy = RetryPolicy(maxRetries = 3), + clock = fixedClock, + ) + OutboxProcessor(store, entryProcessor, listener = listener, clock = fixedClock) + .processNext() + val capturedEvents = events.toList() + val capturedProcessed = processedEntries.toList() + + then("listener receives Failed event with error") { + capturedEvents.size shouldBe 1 + capturedEvents.first() shouldBe OutboxProcessingEvent.Failed( + entry = capturedProcessed.first(), + duration = Duration.ZERO, + error = "bad request", + ) + } + } + } + + given("processNext() when listener throws") { + val throwingListener = object : OutboxProcessorListener { + override fun onEntryProcessed(event: OutboxProcessingEvent) { + throw RuntimeException("listener exploded") + } + override fun onBatchProcessed(processedCount: Int, duration: Duration) { + throw RuntimeException("batch listener exploded") + } + } + + `when`("entry notification throws") { + pendingEntries = listOf(stubEntry()) + val entryProcessor = OutboxEntryProcessor( + deliverer = stubDeliverer(DeliveryResult.Success), + retryPolicy = RetryPolicy(maxRetries = 3), + clock = fixedClock, + ) + OutboxProcessor(store, entryProcessor, listener = throwingListener, clock = fixedClock) + .processNext() + val capturedProcessed = processedEntries.toList() + + then("entry is still processed and persisted") { + capturedProcessed.size shouldBe 1 + capturedProcessed.first().status shouldBe OutboxStatus.DELIVERED + } + } + } + + given("processNext() with no listener (null)") { + `when`("entries are processed") { + pendingEntries = listOf(stubEntry()) + val entryProcessor = OutboxEntryProcessor( + deliverer = stubDeliverer(DeliveryResult.Success), + retryPolicy = RetryPolicy(maxRetries = 3), + clock = fixedClock, + ) + OutboxProcessor(store, entryProcessor).processNext() + val capturedProcessed = processedEntries.toList() + + then("processing works without NPE") { + capturedProcessed.size shouldBe 1 + capturedProcessed.first().status shouldBe OutboxStatus.DELIVERED + } + } + } + + given("processNext() with a listener — RetriableFailure exhausts retries") { + val events = mutableListOf() + + val listener = object : OutboxProcessorListener { + override fun onEntryProcessed(event: OutboxProcessingEvent) { + events += event + } + } + + `when`("retries are exhausted (maxRetries=0)") { + pendingEntries = listOf(stubEntry()) + val entryProcessor = OutboxEntryProcessor( + deliverer = stubDeliverer(DeliveryResult.RetriableFailure("still failing")), + retryPolicy = RetryPolicy(maxRetries = 0), + clock = fixedClock, + ) + OutboxProcessor(store, entryProcessor, listener = listener, clock = fixedClock) + .processNext() + val capturedEvents = events.toList() + val capturedProcessed = processedEntries.toList() + + then("entry is marked FAILED") { + capturedProcessed.first().status shouldBe OutboxStatus.FAILED + } + then("listener receives Failed event, not RetryScheduled") { + capturedEvents.size shouldBe 1 + capturedEvents.first() shouldBe OutboxProcessingEvent.Failed( + entry = capturedProcessed.first(), + duration = Duration.ZERO, + error = "still failing", + ) + } + } + } }) diff --git a/okapi-integration-tests/build.gradle.kts b/okapi-integration-tests/build.gradle.kts index 80002e1..7860b89 100644 --- a/okapi-integration-tests/build.gradle.kts +++ b/okapi-integration-tests/build.gradle.kts @@ -10,6 +10,7 @@ dependencies { testImplementation(project(":okapi-kafka")) testImplementation(project(":okapi-http")) testImplementation(project(":okapi-spring-boot")) + testImplementation(project(":okapi-micrometer")) // Test framework testImplementation(libs.kotestRunnerJunit5) @@ -43,6 +44,9 @@ dependencies { // WireMock (HTTP E2E tests) testImplementation(libs.wiremock) + // Micrometer (observability E2E tests) + testImplementation(libs.micrometerCore) + // Spring (for E2E tests that may need Spring context) testImplementation(libs.springContext) testImplementation(libs.springTx) diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/ObservabilityEndToEndTest.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/ObservabilityEndToEndTest.kt new file mode 100644 index 0000000..518000e --- /dev/null +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/e2e/ObservabilityEndToEndTest.kt @@ -0,0 +1,176 @@ +package com.softwaremill.okapi.test.e2e + +import com.github.tomakehurst.wiremock.WireMockServer +import com.github.tomakehurst.wiremock.client.WireMock.aResponse +import com.github.tomakehurst.wiremock.client.WireMock.post +import com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo +import com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig +import com.softwaremill.okapi.core.OutboxEntryProcessor +import com.softwaremill.okapi.core.OutboxMessage +import com.softwaremill.okapi.core.OutboxProcessor +import com.softwaremill.okapi.core.OutboxPublisher +import com.softwaremill.okapi.core.RetryPolicy +import com.softwaremill.okapi.core.TransactionRunner +import com.softwaremill.okapi.http.HttpMessageDeliverer +import com.softwaremill.okapi.http.ServiceUrlResolver +import com.softwaremill.okapi.http.httpDeliveryInfo +import com.softwaremill.okapi.micrometer.MicrometerOutboxListener +import com.softwaremill.okapi.micrometer.MicrometerOutboxMetrics +import com.softwaremill.okapi.postgres.PostgresOutboxStore +import com.softwaremill.okapi.test.support.PostgresTestSupport +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.doubles.shouldBeGreaterThanOrEqual +import io.kotest.matchers.shouldBe +import io.micrometer.core.instrument.simple.SimpleMeterRegistry +import org.jetbrains.exposed.v1.jdbc.transactions.transaction +import java.time.Clock +import java.util.concurrent.TimeUnit + +class ObservabilityEndToEndTest : FunSpec({ + val db = PostgresTestSupport() + val wiremock = WireMockServer(wireMockConfig().dynamicPort()) + val clock = Clock.systemUTC() + + val exposedTransactionRunner = object : TransactionRunner { + override fun runInTransaction(block: () -> T): T = transaction { block() } + } + + beforeSpec { + db.start() + wiremock.start() + } + + afterSpec { + wiremock.stop() + db.stop() + } + + beforeEach { + wiremock.resetAll() + db.truncate() + } + + fun deliveryInfo() = httpDeliveryInfo { + serviceName = "test-service" + endpointPath = "/api/webhook" + } + + test("full pipeline: publish, deliver, verify Micrometer counters and gauges") { + val registry = SimpleMeterRegistry() + val store = PostgresOutboxStore(clock) + val publisher = OutboxPublisher(store, clock) + val listener = MicrometerOutboxListener(registry) + MicrometerOutboxMetrics(store, registry, transactionRunner = exposedTransactionRunner, clock = clock) + + val urlResolver = ServiceUrlResolver { "http://localhost:${wiremock.port()}" } + val entryProcessor = OutboxEntryProcessor(HttpMessageDeliverer(urlResolver), RetryPolicy(maxRetries = 3), clock) + val processor = OutboxProcessor(store, entryProcessor, listener = listener, clock = clock) + + // Stub: first call → 500 (retriable), second call → 200 (success) + wiremock.stubFor( + post(urlEqualTo("/api/webhook")) + .inScenario("retry-then-succeed") + .whenScenarioStateIs("Started") + .willReturn(aResponse().withStatus(500)) + .willSetStateTo("first-failed"), + ) + wiremock.stubFor( + post(urlEqualTo("/api/webhook")) + .inScenario("retry-then-succeed") + .whenScenarioStateIs("first-failed") + .willReturn(aResponse().withStatus(200)), + ) + + // Publish 1 message + transaction { publisher.publish(OutboxMessage("order.created", """{"orderId":"e2e-1"}"""), deliveryInfo()) } + + // First processNext: HTTP 500 → RetryScheduled + transaction { processor.processNext() } + + registry.counter("okapi.entries.retry.scheduled").count() shouldBe 1.0 + registry.counter("okapi.entries.delivered").count() shouldBe 0.0 + registry.counter("okapi.entries.failed").count() shouldBe 0.0 + registry.timer("okapi.batch.duration").count() shouldBe 1 + + // Gauge: 1 PENDING entry + val pendingGauge = registry.find("okapi.entries.count").tag("status", "pending").gauge() + pendingGauge!!.value() shouldBe 1.0 + + // Second processNext: HTTP 200 → Delivered + transaction { processor.processNext() } + + registry.counter("okapi.entries.delivered").count() shouldBe 1.0 + registry.counter("okapi.entries.retry.scheduled").count() shouldBe 1.0 // still 1 from before + registry.timer("okapi.batch.duration").count() shouldBe 2 + + // Gauge: 0 PENDING, 1 DELIVERED + pendingGauge.value() shouldBe 0.0 + val deliveredGauge = registry.find("okapi.entries.count").tag("status", "delivered").gauge() + deliveredGauge!!.value() shouldBe 1.0 + } + + test("permanent failure: HTTP 400 → Failed counter incremented, gauge reflects FAILED") { + val registry = SimpleMeterRegistry() + val store = PostgresOutboxStore(clock) + val publisher = OutboxPublisher(store, clock) + val listener = MicrometerOutboxListener(registry) + MicrometerOutboxMetrics(store, registry, transactionRunner = exposedTransactionRunner, clock = clock) + + val urlResolver = ServiceUrlResolver { "http://localhost:${wiremock.port()}" } + val entryProcessor = OutboxEntryProcessor(HttpMessageDeliverer(urlResolver), RetryPolicy(maxRetries = 3), clock) + val processor = OutboxProcessor(store, entryProcessor, listener = listener, clock = clock) + + wiremock.stubFor( + post(urlEqualTo("/api/webhook")) + .willReturn(aResponse().withStatus(400)), + ) + + transaction { publisher.publish(OutboxMessage("order.created", """{"orderId":"e2e-2"}"""), deliveryInfo()) } + transaction { processor.processNext() } + + registry.counter("okapi.entries.failed").count() shouldBe 1.0 + registry.counter("okapi.entries.delivered").count() shouldBe 0.0 + + val failedGauge = registry.find("okapi.entries.count").tag("status", "failed").gauge() + failedGauge!!.value() shouldBe 1.0 + } + + test("batch duration timer records realistic delivery time") { + val registry = SimpleMeterRegistry() + val store = PostgresOutboxStore(clock) + val publisher = OutboxPublisher(store, clock) + val listener = MicrometerOutboxListener(registry) + + val urlResolver = ServiceUrlResolver { "http://localhost:${wiremock.port()}" } + val entryProcessor = OutboxEntryProcessor(HttpMessageDeliverer(urlResolver), RetryPolicy(maxRetries = 3), clock) + val processor = OutboxProcessor(store, entryProcessor, listener = listener, clock = clock) + + wiremock.stubFor( + post(urlEqualTo("/api/webhook")) + .willReturn(aResponse().withStatus(200).withFixedDelay(50)), + ) + + transaction { publisher.publish(OutboxMessage("order.created", """{"orderId":"e2e-3"}"""), deliveryInfo()) } + transaction { processor.processNext() } + + val timer = registry.timer("okapi.batch.duration") + timer.count() shouldBe 1 + timer.totalTime(TimeUnit.MILLISECONDS) shouldBeGreaterThanOrEqual 50.0 + } + + test("lag gauge reflects real time difference for pending entries") { + val registry = SimpleMeterRegistry() + val store = PostgresOutboxStore(clock) + val publisher = OutboxPublisher(store, clock) + MicrometerOutboxMetrics(store, registry, transactionRunner = exposedTransactionRunner, clock = clock) + + // Publish but don't process — entry stays PENDING + transaction { publisher.publish(OutboxMessage("order.created", """{"orderId":"e2e-4"}"""), deliveryInfo()) } + + // Small sleep to create measurable lag + Thread.sleep(100) + + val lagGauge = registry.find("okapi.entries.lag.seconds").tag("status", "pending").gauge() + lagGauge!!.value() shouldBeGreaterThanOrEqual 0.05 // at least 50ms lag + } +}) diff --git a/okapi-micrometer/build.gradle.kts b/okapi-micrometer/build.gradle.kts new file mode 100644 index 0000000..98b7b3d --- /dev/null +++ b/okapi-micrometer/build.gradle.kts @@ -0,0 +1,15 @@ +plugins { + id("buildsrc.convention.kotlin-jvm") +} + +dependencies { + implementation(project(":okapi-core")) + implementation(libs.slf4jApi) + compileOnly(libs.micrometerCore) + + testImplementation(libs.kotestRunnerJunit5) + testImplementation(libs.kotestAssertionsCore) + testImplementation(libs.micrometerCore) + testImplementation(libs.micrometerTest) + testRuntimeOnly(libs.slf4jSimple) +} diff --git a/okapi-micrometer/src/main/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxListener.kt b/okapi-micrometer/src/main/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxListener.kt new file mode 100644 index 0000000..63595dc --- /dev/null +++ b/okapi-micrometer/src/main/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxListener.kt @@ -0,0 +1,41 @@ +package com.softwaremill.okapi.micrometer + +import com.softwaremill.okapi.core.OutboxProcessingEvent +import com.softwaremill.okapi.core.OutboxProcessingEvent.Delivered +import com.softwaremill.okapi.core.OutboxProcessingEvent.Failed +import com.softwaremill.okapi.core.OutboxProcessingEvent.RetryScheduled +import com.softwaremill.okapi.core.OutboxProcessorListener +import io.micrometer.core.instrument.Counter +import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.core.instrument.Timer +import java.time.Duration + +/** + * [OutboxProcessorListener] that records delivery outcomes as Micrometer counters + * and batch duration as a timer. + * + * Registered metrics: + * - `okapi.entries.delivered` — counter + * - `okapi.entries.retry.scheduled` — counter + * - `okapi.entries.failed` — counter + * - `okapi.batch.duration` — timer + * + * Note: `processedCount` from [onBatchProcessed] is not recorded as a separate metric; + * per-entry counters provide sufficient granularity. + */ +class MicrometerOutboxListener(registry: MeterRegistry) : OutboxProcessorListener { + private val deliveredCounter = Counter.builder("okapi.entries.delivered").register(registry) + private val retryScheduledCounter = Counter.builder("okapi.entries.retry.scheduled").register(registry) + private val failedCounter = Counter.builder("okapi.entries.failed").register(registry) + private val batchTimer = Timer.builder("okapi.batch.duration").register(registry) + + override fun onEntryProcessed(event: OutboxProcessingEvent) = when (event) { + is Delivered -> deliveredCounter.increment() + is RetryScheduled -> retryScheduledCounter.increment() + is Failed -> failedCounter.increment() + } + + override fun onBatchProcessed(processedCount: Int, duration: Duration) { + batchTimer.record(duration) + } +} diff --git a/okapi-micrometer/src/main/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxMetrics.kt b/okapi-micrometer/src/main/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxMetrics.kt new file mode 100644 index 0000000..9414980 --- /dev/null +++ b/okapi-micrometer/src/main/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxMetrics.kt @@ -0,0 +1,58 @@ +package com.softwaremill.okapi.micrometer + +import com.softwaremill.okapi.core.OutboxStatus +import com.softwaremill.okapi.core.OutboxStore +import com.softwaremill.okapi.core.TransactionRunner +import io.micrometer.core.instrument.Gauge +import io.micrometer.core.instrument.MeterRegistry +import org.slf4j.LoggerFactory +import java.time.Clock +import java.time.Duration + +/** + * Registers Micrometer gauges that poll [OutboxStore] on every Prometheus scrape. + * + * Registered metrics: + * - `okapi.entries.count` (tag: status) — number of entries per status + * - `okapi.entries.lag.seconds` (tag: status) — age of the oldest entry per status + * + * Gauge suppliers run on the Prometheus scrape thread, which has no ambient transaction. + * Store implementations backed by Exposed (e.g. `PostgresOutboxStore`, `MysqlOutboxStore`) + * require an active transaction on the calling thread, so a [TransactionRunner] must be supplied + * when using such stores. A read-only [TransactionRunner] is recommended. + * + * If a store call throws, the gauge returns [Double.NaN] and the exception is logged at WARN. + * This surfaces database outages as visible metric gaps instead of silently reporting zero. + */ +class MicrometerOutboxMetrics( + private val store: OutboxStore, + registry: MeterRegistry, + private val transactionRunner: TransactionRunner? = null, + private val clock: Clock = Clock.systemUTC(), +) { + init { + for (status in OutboxStatus.entries) { + Gauge.builder("okapi.entries.count") { safeQuery { store.countByStatuses()[status]?.toDouble() ?: 0.0 } } + .tag("status", status.name.lowercase()) + .register(registry) + + Gauge.builder("okapi.entries.lag.seconds") { + safeQuery { + store.findOldestCreatedAt(setOf(status))[status] + ?.let { Duration.between(it, clock.instant()).toMillis() / 1000.0 } ?: 0.0 + } + }.tag("status", status.name.lowercase()).register(registry) + } + } + + private fun safeQuery(query: () -> Double): Double = try { + transactionRunner?.runInTransaction(query) ?: query() + } catch (e: Exception) { + logger.warn("Failed to read outbox metric from store", e) + Double.NaN + } + + companion object { + private val logger = LoggerFactory.getLogger(MicrometerOutboxMetrics::class.java) + } +} diff --git a/okapi-micrometer/src/test/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxListenerTest.kt b/okapi-micrometer/src/test/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxListenerTest.kt new file mode 100644 index 0000000..6b82d49 --- /dev/null +++ b/okapi-micrometer/src/test/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxListenerTest.kt @@ -0,0 +1,66 @@ +package com.softwaremill.okapi.micrometer + +import com.softwaremill.okapi.core.DeliveryInfo +import com.softwaremill.okapi.core.OutboxEntry +import com.softwaremill.okapi.core.OutboxMessage +import com.softwaremill.okapi.core.OutboxProcessingEvent +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.shouldBe +import io.micrometer.core.instrument.simple.SimpleMeterRegistry +import java.time.Duration +import java.time.Instant + +private val stubDeliveryInfo = + object : DeliveryInfo { + override val type = "stub" + + override fun serialize(): String = """{"type":"stub"}""" + } + +private fun stubEntry(): OutboxEntry = OutboxEntry.createPending(OutboxMessage("test.event", "{}"), stubDeliveryInfo, Instant.EPOCH) + +class MicrometerOutboxListenerTest : FunSpec({ + + test("Delivered event increments delivered counter") { + val registry = SimpleMeterRegistry() + val listener = MicrometerOutboxListener(registry) + val entry = stubEntry().toDelivered(Instant.EPOCH) + + listener.onEntryProcessed(OutboxProcessingEvent.Delivered(entry, Duration.ofMillis(42))) + + registry.counter("okapi.entries.delivered").count() shouldBe 1.0 + registry.counter("okapi.entries.retry.scheduled").count() shouldBe 0.0 + registry.counter("okapi.entries.failed").count() shouldBe 0.0 + } + + test("RetryScheduled event increments retry.scheduled counter") { + val registry = SimpleMeterRegistry() + val listener = MicrometerOutboxListener(registry) + val entry = stubEntry().retry(Instant.EPOCH, "timeout") + + listener.onEntryProcessed(OutboxProcessingEvent.RetryScheduled(entry, Duration.ofMillis(10), "timeout")) + + registry.counter("okapi.entries.retry.scheduled").count() shouldBe 1.0 + } + + test("Failed event increments failed counter") { + val registry = SimpleMeterRegistry() + val listener = MicrometerOutboxListener(registry) + val entry = stubEntry().toFailed(Instant.EPOCH, "bad request") + + listener.onEntryProcessed(OutboxProcessingEvent.Failed(entry, Duration.ofMillis(5), "bad request")) + + registry.counter("okapi.entries.failed").count() shouldBe 1.0 + } + + test("onBatchProcessed records duration in timer") { + val registry = SimpleMeterRegistry() + val listener = MicrometerOutboxListener(registry) + + listener.onBatchProcessed(3, Duration.ofMillis(150)) + + val timer = registry.timer("okapi.batch.duration") + timer.count() shouldBe 1 + timer.totalTime(java.util.concurrent.TimeUnit.MILLISECONDS) shouldBe 150.0 + } +}) diff --git a/okapi-micrometer/src/test/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxMetricsTest.kt b/okapi-micrometer/src/test/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxMetricsTest.kt new file mode 100644 index 0000000..901d0ab --- /dev/null +++ b/okapi-micrometer/src/test/kotlin/com/softwaremill/okapi/micrometer/MicrometerOutboxMetricsTest.kt @@ -0,0 +1,113 @@ +package com.softwaremill.okapi.micrometer + +import com.softwaremill.okapi.core.OutboxEntry +import com.softwaremill.okapi.core.OutboxStatus +import com.softwaremill.okapi.core.OutboxStore +import com.softwaremill.okapi.core.TransactionRunner +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.shouldBe +import io.micrometer.core.instrument.simple.SimpleMeterRegistry +import java.time.Clock +import java.time.Instant +import java.time.ZoneOffset + +private fun stubStore(counts: Map = emptyMap(), oldest: Map = emptyMap()) = + object : OutboxStore { + override fun persist(entry: OutboxEntry) = entry + override fun claimPending(limit: Int) = emptyList() + override fun updateAfterProcessing(entry: OutboxEntry) = entry + override fun removeDeliveredBefore(time: Instant, limit: Int) = 0 + + override fun findOldestCreatedAt(statuses: Set) = oldest.filterKeys { it in statuses } + + override fun countByStatuses() = counts + } + +private fun throwingStore(error: Exception = RuntimeException("db down")) = object : OutboxStore { + override fun persist(entry: OutboxEntry) = entry + override fun claimPending(limit: Int) = emptyList() + override fun updateAfterProcessing(entry: OutboxEntry) = entry + override fun removeDeliveredBefore(time: Instant, limit: Int) = 0 + override fun findOldestCreatedAt(statuses: Set): Map = throw error + override fun countByStatuses(): Map = throw error +} + +class MicrometerOutboxMetricsTest : FunSpec({ + + test("count gauge reflects store values per status") { + val store = stubStore( + counts = mapOf(OutboxStatus.PENDING to 5L, OutboxStatus.FAILED to 2L), + ) + val registry = SimpleMeterRegistry() + MicrometerOutboxMetrics(store, registry) + + val pendingGauge = registry.find("okapi.entries.count").tag("status", "pending").gauge() + val failedGauge = registry.find("okapi.entries.count").tag("status", "failed").gauge() + val deliveredGauge = registry.find("okapi.entries.count").tag("status", "delivered").gauge() + + pendingGauge!!.value() shouldBe 5.0 + failedGauge!!.value() shouldBe 2.0 + deliveredGauge!!.value() shouldBe 0.0 + } + + test("lag gauge computes seconds between oldest entry and now") { + val now = Instant.parse("2024-01-01T00:01:00Z") + val oldest = Instant.parse("2024-01-01T00:00:00Z") // 60 seconds ago + val fixedClock = Clock.fixed(now, ZoneOffset.UTC) + val store = stubStore(oldest = mapOf(OutboxStatus.PENDING to oldest)) + val registry = SimpleMeterRegistry() + MicrometerOutboxMetrics(store, registry, clock = fixedClock) + + val lagGauge = registry.find("okapi.entries.lag.seconds").tag("status", "pending").gauge() + lagGauge!!.value() shouldBe 60.0 + } + + test("lag gauge returns 0 when no entries for status") { + val store = stubStore() + val registry = SimpleMeterRegistry() + MicrometerOutboxMetrics(store, registry) + + val lagGauge = registry.find("okapi.entries.lag.seconds").tag("status", "pending").gauge() + lagGauge!!.value() shouldBe 0.0 + } + + test("transactionRunner wraps gauge queries when provided") { + val store = stubStore(counts = mapOf(OutboxStatus.PENDING to 7L)) + val registry = SimpleMeterRegistry() + var wrapCount = 0 + val recordingRunner = object : TransactionRunner { + override fun runInTransaction(block: () -> T): T { + wrapCount++ + return block() + } + } + MicrometerOutboxMetrics(store, registry, transactionRunner = recordingRunner) + + val pendingGauge = registry.find("okapi.entries.count").tag("status", "pending").gauge() + pendingGauge!!.value() shouldBe 7.0 + wrapCount shouldBe 1 + } + + test("gauge returns NaN and logs when store throws") { + val registry = SimpleMeterRegistry() + MicrometerOutboxMetrics(throwingStore(), registry) + + val countGauge = registry.find("okapi.entries.count").tag("status", "pending").gauge() + val lagGauge = registry.find("okapi.entries.lag.seconds").tag("status", "pending").gauge() + + countGauge!!.value().isNaN() shouldBe true + lagGauge!!.value().isNaN() shouldBe true + } + + test("gauge returns NaN when transactionRunner throws") { + val store = stubStore(counts = mapOf(OutboxStatus.PENDING to 5L)) + val registry = SimpleMeterRegistry() + val failingRunner = object : TransactionRunner { + override fun runInTransaction(block: () -> T): T = throw IllegalStateException("no tx manager") + } + MicrometerOutboxMetrics(store, registry, transactionRunner = failingRunner) + + val pendingGauge = registry.find("okapi.entries.count").tag("status", "pending").gauge() + pendingGauge!!.value().isNaN() shouldBe true + } +}) diff --git a/okapi-spring-boot/build.gradle.kts b/okapi-spring-boot/build.gradle.kts index 5512268..395d4c2 100644 --- a/okapi-spring-boot/build.gradle.kts +++ b/okapi-spring-boot/build.gradle.kts @@ -19,6 +19,8 @@ dependencies { compileOnly(project(":okapi-postgres")) compileOnly(project(":okapi-mysql")) compileOnly(libs.liquibaseCore) + compileOnly(project(":okapi-micrometer")) + compileOnly(libs.micrometerCore) testImplementation(libs.kotestRunnerJunit5) testImplementation(libs.kotestAssertionsCore) @@ -41,6 +43,8 @@ dependencies { testImplementation(libs.testcontainersMysql) testImplementation(libs.mysql) testImplementation(libs.wiremock) + testImplementation(project(":okapi-micrometer")) + testImplementation(libs.micrometerCore) } // CI version override: ./gradlew :okapi-spring-boot:test -PspringBootVersion=4.0.4 -PspringVersion=7.0.6 diff --git a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OkapiMicrometerAutoConfiguration.kt b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OkapiMicrometerAutoConfiguration.kt new file mode 100644 index 0000000..f8c7c54 --- /dev/null +++ b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OkapiMicrometerAutoConfiguration.kt @@ -0,0 +1,57 @@ +package com.softwaremill.okapi.springboot + +import com.softwaremill.okapi.core.OutboxStore +import com.softwaremill.okapi.micrometer.MicrometerOutboxListener +import com.softwaremill.okapi.micrometer.MicrometerOutboxMetrics +import io.micrometer.core.instrument.MeterRegistry +import org.springframework.beans.factory.ObjectProvider +import org.springframework.boot.autoconfigure.AutoConfiguration +import org.springframework.boot.autoconfigure.AutoConfigureAfter +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean +import org.springframework.context.annotation.Bean +import org.springframework.transaction.PlatformTransactionManager +import org.springframework.transaction.support.TransactionTemplate +import java.time.Clock + +/** + * Autoconfiguration for Okapi Micrometer observability beans. + * + * Separated from [OutboxAutoConfiguration] as a top-level autoconfiguration + * so that [ConditionalOnBean] for [MeterRegistry] evaluates after the meter + * registry is created by Spring Boot's metrics autoconfiguration. + * + * Both beans are `@ConditionalOnMissingBean` — define your own [MicrometerOutboxListener] + * or [MicrometerOutboxMetrics] bean to override the defaults. + */ +@AutoConfiguration +@AutoConfigureAfter( + name = ["org.springframework.boot.micrometer.metrics.autoconfigure.CompositeMeterRegistryAutoConfiguration"], +) +@ConditionalOnClass(name = ["io.micrometer.core.instrument.MeterRegistry"]) +@ConditionalOnBean(MeterRegistry::class) +class OkapiMicrometerAutoConfiguration { + @Bean + @ConditionalOnMissingBean + fun micrometerOutboxListener(registry: MeterRegistry): MicrometerOutboxListener = MicrometerOutboxListener(registry) + + @Bean + @ConditionalOnMissingBean + fun micrometerOutboxMetrics( + store: OutboxStore, + registry: MeterRegistry, + transactionManager: ObjectProvider, + clock: ObjectProvider, + ): MicrometerOutboxMetrics { + val readOnlyRunner = transactionManager.getIfAvailable()?.let { tm -> + SpringTransactionRunner(TransactionTemplate(tm).apply { isReadOnly = true }) + } + return MicrometerOutboxMetrics( + store = store, + registry = registry, + transactionRunner = readOnlyRunner, + clock = clock.getIfAvailable { Clock.systemUTC() }, + ) + } +} diff --git a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt index 53268d8..123d367 100644 --- a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt +++ b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt @@ -4,6 +4,7 @@ import com.softwaremill.okapi.core.CompositeMessageDeliverer import com.softwaremill.okapi.core.MessageDeliverer import com.softwaremill.okapi.core.OutboxEntryProcessor import com.softwaremill.okapi.core.OutboxProcessor +import com.softwaremill.okapi.core.OutboxProcessorListener import com.softwaremill.okapi.core.OutboxPublisher import com.softwaremill.okapi.core.OutboxPurgerConfig import com.softwaremill.okapi.core.OutboxSchedulerConfig @@ -87,10 +88,17 @@ class OutboxAutoConfiguration( @Bean @ConditionalOnMissingBean - fun outboxProcessor(outboxStore: OutboxStore, outboxEntryProcessor: OutboxEntryProcessor): OutboxProcessor { + fun outboxProcessor( + outboxStore: OutboxStore, + outboxEntryProcessor: OutboxEntryProcessor, + listener: ObjectProvider, + clock: ObjectProvider, + ): OutboxProcessor { return OutboxProcessor( store = outboxStore, entryProcessor = outboxEntryProcessor, + listener = listener.getIfAvailable(), + clock = clock.getIfAvailable { Clock.systemUTC() }, ) } diff --git a/okapi-spring-boot/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/okapi-spring-boot/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports index 8987c56..e19af6d 100644 --- a/okapi-spring-boot/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports +++ b/okapi-spring-boot/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -1 +1,2 @@ com.softwaremill.okapi.springboot.OutboxAutoConfiguration +com.softwaremill.okapi.springboot.OkapiMicrometerAutoConfiguration diff --git a/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxProcessorAutoConfigurationTest.kt b/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxProcessorAutoConfigurationTest.kt index 713755c..31752f0 100644 --- a/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxProcessorAutoConfigurationTest.kt +++ b/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxProcessorAutoConfigurationTest.kt @@ -5,6 +5,8 @@ import com.softwaremill.okapi.core.MessageDeliverer import com.softwaremill.okapi.core.OutboxEntry import com.softwaremill.okapi.core.OutboxStatus import com.softwaremill.okapi.core.OutboxStore +import com.softwaremill.okapi.micrometer.MicrometerOutboxListener +import com.softwaremill.okapi.micrometer.MicrometerOutboxMetrics import io.kotest.core.spec.style.FunSpec import io.kotest.matchers.nulls.shouldNotBeNull import io.kotest.matchers.shouldBe @@ -19,7 +21,7 @@ import javax.sql.DataSource class OutboxProcessorAutoConfigurationTest : FunSpec({ val contextRunner = ApplicationContextRunner() - .withConfiguration(AutoConfigurations.of(OutboxAutoConfiguration::class.java)) + .withConfiguration(AutoConfigurations.of(OutboxAutoConfiguration::class.java, OkapiMicrometerAutoConfiguration::class.java)) .withBean(OutboxStore::class.java, { stubStore() }) .withBean(MessageDeliverer::class.java, { stubDeliverer() }) .withBean(DataSource::class.java, { SimpleDriverDataSource() }) @@ -85,6 +87,17 @@ class OutboxProcessorAutoConfigurationTest : FunSpec({ callbackInvoked shouldBe true } } + + test("listener is wired into processor when MeterRegistry is present") { + contextRunner + .withBean(io.micrometer.core.instrument.MeterRegistry::class.java, { + io.micrometer.core.instrument.simple.SimpleMeterRegistry() + }) + .run { ctx -> + ctx.getBean(MicrometerOutboxListener::class.java).shouldNotBeNull() + ctx.getBean(MicrometerOutboxMetrics::class.java).shouldNotBeNull() + } + } }) private fun stubStore() = object : OutboxStore { diff --git a/settings.gradle.kts b/settings.gradle.kts index 1e1b7d3..832da0b 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -17,5 +17,6 @@ include("okapi-kafka") include("okapi-spring-boot") include("okapi-bom") include("okapi-integration-tests") +include("okapi-micrometer") rootProject.name = "okapi"