From 516422efa90bd69967e1411942e054636e71c9c8 Mon Sep 17 00:00:00 2001 From: Jan Ove Kongshaug Date: Mon, 23 Mar 2026 09:55:41 +0100 Subject: [PATCH 01/10] feat: Kafka bootstrap and runtime health probes Introduce Kafka-aware readiness and liveness health handling for the consumer. Readiness is now based on initial Kafka bootstrap instead of a fixed startup delay. The application stays unready until the blocking listeners have consumed up to their startup end offsets, and then remains ready for the rest of the pod lifetime. This includes both the entity listener and the relation-update listener. Liveness is now separated from bootstrap and tracks Kafka runtime health for registered listeners. It reacts to Spring Kafka runtime events such as non-responsive consumers, failed starts and stopped consumers, while using a grace period to avoid false positives from short interruptions. Normal lag and quiet topics do not make the pod unhealthy. Also add Micrometer metrics for bootstrap progress and runtime Kafka health, including bootstrap duration, pending partitions, runtime problem counters and unhealthy state gauges. Update actuator health group configuration and add documentation for the new startup/readiness/liveness model, Kafka-specific health behavior, metrics and Kubernetes probe configuration. --- docs/kafka-health-checks.md | 268 ++++++++++++++++++ .../kafka/AutoRelationEntityConsumer.kt | 16 +- .../kafka/RelationUpdateConsumer.kt | 37 ++- .../health/BootstrapPartitionStatus.kt | 8 + .../health/BootstrapReadinessSnapshot.kt | 6 + .../consumer/health/EndOffsetProvider.kt | 42 +++ .../InitialKafkaBootstrapHealthIndicator.kt | 31 ++ .../health/InitialKafkaBootstrapTracker.kt | 193 +++++++++++++ .../health/KafkaHealthConfiguration.kt | 11 + .../consumer/health/KafkaHealthMetrics.kt | 190 +++++++++++++ .../consumer/health/KafkaHealthProperties.kt | 12 + .../KafkaListenerContainerHealthConfigurer.kt | 15 + .../consumer/health/KafkaListenerIds.kt | 9 + .../health/KafkaRuntimeHealthMonitor.kt | 161 +++++++++++ .../health/ListenerBootstrapStatus.kt | 10 + .../consumer/kafka/entity/EntityConsumer.kt | 31 +- .../kafka/event/EventResponseConsumer.kt | 16 +- .../kafka/event/RequestFintEventConsumer.kt | 16 +- src/main/resources/application.yaml | 20 ++ ...nitialKafkaBootstrapHealthIndicatorTest.kt | 29 ++ .../InitialKafkaBootstrapTrackerTest.kt | 191 +++++++++++++ .../health/KafkaRuntimeHealthMonitorTest.kt | 141 +++++++++ .../fintlabs/consumer/health/MutableClock.kt | 21 ++ .../entity/RelationUpdateConsumerTest.kt | 19 +- 24 files changed, 1471 insertions(+), 22 deletions(-) create mode 100644 docs/kafka-health-checks.md create mode 100644 src/main/java/no/fintlabs/consumer/health/BootstrapPartitionStatus.kt create mode 100644 src/main/java/no/fintlabs/consumer/health/BootstrapReadinessSnapshot.kt create mode 100644 src/main/java/no/fintlabs/consumer/health/EndOffsetProvider.kt create mode 100644 src/main/java/no/fintlabs/consumer/health/InitialKafkaBootstrapHealthIndicator.kt create mode 100644 src/main/java/no/fintlabs/consumer/health/InitialKafkaBootstrapTracker.kt create mode 100644 src/main/java/no/fintlabs/consumer/health/KafkaHealthConfiguration.kt create mode 100644 src/main/java/no/fintlabs/consumer/health/KafkaHealthMetrics.kt create mode 100644 src/main/java/no/fintlabs/consumer/health/KafkaHealthProperties.kt create mode 100644 src/main/java/no/fintlabs/consumer/health/KafkaListenerContainerHealthConfigurer.kt create mode 100644 src/main/java/no/fintlabs/consumer/health/KafkaListenerIds.kt create mode 100644 src/main/java/no/fintlabs/consumer/health/KafkaRuntimeHealthMonitor.kt create mode 100644 src/main/java/no/fintlabs/consumer/health/ListenerBootstrapStatus.kt create mode 100644 src/test/java/no/fintlabs/consumer/health/InitialKafkaBootstrapHealthIndicatorTest.kt create mode 100644 src/test/java/no/fintlabs/consumer/health/InitialKafkaBootstrapTrackerTest.kt create mode 100644 src/test/java/no/fintlabs/consumer/health/KafkaRuntimeHealthMonitorTest.kt create mode 100644 src/test/java/no/fintlabs/consumer/health/MutableClock.kt diff --git a/docs/kafka-health-checks.md b/docs/kafka-health-checks.md new file mode 100644 index 00000000..a0379a8f --- /dev/null +++ b/docs/kafka-health-checks.md @@ -0,0 +1,268 @@ +# Kafka Health Checks + +Dette dokumentet beskriver hvordan health-checkene i `fint-core-consumer` fungerer etter innføringen av Kafka-basert readiness og liveness. + +## Oversikt + +Applikasjonen bruker tre forskjellige typer health/probes i Kubernetes: + +- `startupProbe`: brukes bare helt tidlig for å verifisere at JVM og Spring Boot faktisk starter. +- `readinessProbe`: brukes for å avgjøre om poden trygt kan motta trafikk. +- `livenessProbe`: brukes for å avgjøre om poden fortsatt lever, eller om Kubernetes skal restarte den. + +Disse probe-typene har forskjellig ansvar og skal ikke blandes: + +- `startupProbe` skal ikke vite noe om hvor langt Kafka-consumerne har kommet i bootstrap. +- `readinessProbe` skal blokkere trafikk til initial bootstrap er ferdig. +- `livenessProbe` skal ikke feile bare fordi applikasjonen ligger litt etter i konsumering; den skal feile hvis Kafka-consumerne i praksis har sluttet å fungere over tid. + +## Hvordan Consumer bruker dem + +Consumer eksponerer actuator-endepunktene: + +- `/actuator/health/readiness` +- `/actuator/health/liveness` + +Readiness er koblet til en initial Kafka-bootstrap-tracker. Liveness er koblet til en separat Kafka-runtime-monitor. + +## Readiness + +### Hensikt + +Readiness skal beskytte trafikk mot en pod som ennå ikke har bygd opp lokal cache ved oppstart. + +### Hvordan den virker + +Ved oppstart settes readiness til `REFUSING_TRAFFIC`. + +To listeners er definert som blokkerende for initial bootstrap: + +- `entity` +- `relation-update` + +For hver assigned partition hentes "startup end offset" fra Kafka i det assignment skjer. Deretter spores prosesserte offsets mens records behandles. + +Listeneren regnes som ferdig når alle dens assigned partitions har konsumert seg opp til offseten som gjaldt ved oppstartstidspunktet. + +Applikasjonen regnes som `ready` når alle blokkerende listeners er ferdige. + +### Viktig nyanse + +Dette er en `initial-only` readiness. + +Det betyr: + +- Readiness blokkerer ved oppstart. +- Readiness går til `UP` når initial bootstrap er ferdig. +- Readiness går ikke ned igjen senere bare fordi det kommer flere meldinger, full sync, eller midlertidig lag. + +Dette er bevisst. Etter at poden er sluppet i trafikk, skal vanlig Kafka-lag ikke stoppe lesetrafikk. + +### Hva får readiness til å feile + +Readiness blir `OUT_OF_SERVICE` hvis minst ett av disse forholdene gjelder under oppstart: + +- `entity` har ikke konsumert alle sine assigned partitions opp til startup-end-offset. +- `relation-update` har ikke konsumert alle sine assigned partitions opp til startup-end-offset. +- Kafka end-offset kan ikke hentes for assigned partitions. + +### Hva får ikke readiness til å feile + +Følgende forhold feiler ikke readiness etter at bootstrap er ferdig: + +- En ny full sync kommer inn og skaper lag. +- Det produseres mange meldinger mens appen kjører. +- Consumeren ligger midlertidig etter på topicet. +- Topicet er stille i lang tid. + +## Liveness + +### Hensikt + +Liveness skal oppdage at Kafka-consumerne i praksis har sluttet å fungere, og gi Kubernetes grunnlag for å restarte poden. + +### Hvordan den virker + +Liveness monitorerer runtime-status for registrerte Kafka-listeners. + +Den ser ikke på vanlig lag eller antall uprosesserte meldinger. I stedet ser den på Kafka-runtime-signaler: + +- `ConsumerStartedEvent` +- `ListenerContainerIdleEvent` +- `NonResponsiveConsumerEvent` +- `ConsumerFailedToStartEvent` +- `ConsumerStoppedEvent` + +Det brukes en grace-periode, default `15m`, for å unngå falske positive ved korte forstyrrelser. + +### Hva som holder liveness frisk + +Liveness holdes `UP` hvis en listener viser tegn til normal drift, for eksempel: + +- appen prosesserer records +- listeneren sender idle-events fordi topicet er stille +- consumer-containeren starter normalt + +Det betyr at stille topics ikke i seg selv gjør poden unhealthy. + +### Hva får liveness til å feile + +Liveness blir `DOWN` hvis en registrert listener har en runtime-feil som varer lenger enn konfigurert grace-periode. + +Eksempler: + +- `NonResponsiveConsumerEvent` og tilstanden varer lenger enn grace-perioden +- `ConsumerFailedToStartEvent` +- `ConsumerStoppedEvent` med annen grunn enn `NORMAL` + +Typiske scenarioer dette er ment å fange: + +- nettverksbrudd mellom pod og Kafka +- Kafka svarer ikke over tid +- autentiseringsfeil mot Kafka +- consumer-container stopper på grunn av feil + +### Hva får ikke liveness til å feile + +Følgende forhold skal ikke alene feile liveness: + +- vanlig Kafka-lag +- full sync som gjør at consumeren henger litt etter +- ingen nye meldinger på topicet i flere timer +- normal rebalance mellom pods + +En vanlig rebalance håndteres av partition assignment/revocation, ikke som en fatal liveness-feil. + +## Startup Probe + +### Hensikt + +`startupProbe` bør bare brukes som en enkel oppstartssperre mens JVM og Spring Boot kommer opp. + +Den bør ikke inneholde Kafka-bootstrap-logikk. Grunnen er at `startupProbe` bare brukes i den tidlige oppstartsfasen, mens readiness kan uttrykke "ikke klar enda" på en mer presis måte. + +### Anbefaling + +Bruk `startupProbe` mot en enkel actuator-health, mens readiness og liveness peker mot de dedikerte probe-endepunktene. + +## Konfigurasjon i Consumer + +Default konfigurasjon i applikasjonen: + +```yaml +fint: + consumer: + health: + kafka: + idle-event-interval: 1m + runtime-grace-period: 15m + monitor-interval-seconds: 30 + no-poll-threshold: 3.0 + +management: + endpoint: + health: + probes: + enabled: true + group: + readiness: + include: readinessState,initialKafkaBootstrap + liveness: + include: livenessState,kafkaRuntime +``` + +### Hva disse Kafka-innstillingene betyr + +- `idle-event-interval`: hvor ofte idle-event sendes mens et topic er stille. +- `runtime-grace-period`: hvor lenge runtime-feil kan vare før liveness går ned. +- `monitor-interval-seconds`: hvor ofte Spring Kafka sjekker consumerens poll-aktivitet. +- `no-poll-threshold`: terskel for når manglende poll anses som "non-responsive". + +## Metrikker + +I tillegg til actuator-health eksponerer applikasjonen nå Micrometer-metrikker for Kafka-bootstrap og Kafka-runtime-health. Disse er nyttige fordi health-endepunktene bare viser nåværende status, mens metrikker gjør det mulig å følge utvikling over tid i Prometheus og Grafana. + +### Bootstrap-metrikker + +- `fint.consumer.kafka.bootstrap.state` + Gauge per listener. `1` betyr at initial bootstrap fortsatt pågår, `0` betyr at den er ferdig. + +- `fint.consumer.kafka.bootstrap.partitions.pending` + Gauge per listener. Antall assigned partitions som ennå ikke har konsumert seg opp til startup-end-offset. + +- `fint.consumer.kafka.bootstrap.completed` + Counter per listener, og også for `listener=all`. Incrementes når bootstrap fullføres. + +- `fint.consumer.kafka.bootstrap.duration` + Timer per listener, og også for `listener=all`. Måler hvor lang tid initial bootstrap faktisk tok. + +- `fint.consumer.kafka.bootstrap.end_offset.lookup.failures` + Counter per listener. Incrementes når applikasjonen ikke klarer å hente startup end offset fra Kafka. + +### Runtime-metrikker + +- `fint.consumer.kafka.runtime.problem` + Counter med tags `listener` og `reason`. Incrementes når runtime-monitoren ser et problem, for eksempel `NON_RESPONSIVE` eller `STOPPED_AUTH`. + +- `fint.consumer.kafka.runtime.unhealthy` + Gauge per listener. `1` betyr at listeneren har vært i problemtilstand lenger enn grace-perioden og dermed gjør liveness `DOWN`. + +- `fint.consumer.kafka.runtime.problem.duration` + Gauge per listener. Viser hvor lenge den nåværende problemtilstanden har vart, i millisekunder. + +### Viktige tags + +Metrikkene er bevisst tagget lavt-kardinalt: + +- `listener` +- `reason` + +Det brukes ikke tags som Kafka-key, partition eller corrId, for å unngå høy kardinalitet og unødvendig støy i metrics-backend. + +## Eksempel i Kubernetes + +Eksempel på probe-oppsett: + +```yaml +startupProbe: + httpGet: + path: /utdanning/vurdering/actuator/health + port: 8080 + periodSeconds: 5 + timeoutSeconds: 3 + failureThreshold: 60 + +readinessProbe: + httpGet: + path: /utdanning/vurdering/actuator/health/readiness + port: 8080 + periodSeconds: 5 + timeoutSeconds: 3 + failureThreshold: 3 + +livenessProbe: + httpGet: + path: /utdanning/vurdering/actuator/health/liveness + port: 8080 + periodSeconds: 30 + timeoutSeconds: 3 + failureThreshold: 3 +``` + +## Praktiske konsekvenser + +Med dette oppsettet blir flyten typisk slik: + +1. Poden starter. +2. `startupProbe` verifiserer at appen faktisk kommer opp. +3. `readinessProbe` holder poden ute av trafikk mens `entity` og `relation-update` bygger initial cache. +4. Når initial bootstrap er ferdig, blir poden `Ready`. +5. Senere full sync eller vanlig Kafka-lag påvirker ikke readiness. +6. Hvis Kafka-consumerne blir ikke-responsive eller stopper over tid, blir `liveness` `DOWN` og Kubernetes kan restarte poden. + +## Oppsummering + +- `startupProbe` beskytter bare oppstart av prosessen. +- `readinessProbe` beskytter trafikk under initial Kafka-bootstrap. +- `livenessProbe` beskytter mot vedvarende Kafka-runtime-feil. +- Vanlig lag eller stille topics skal ikke gjøre poden unhealthy. diff --git a/src/main/java/no/fintlabs/autorelation/kafka/AutoRelationEntityConsumer.kt b/src/main/java/no/fintlabs/autorelation/kafka/AutoRelationEntityConsumer.kt index a1c2e15f..9efb31a2 100644 --- a/src/main/java/no/fintlabs/autorelation/kafka/AutoRelationEntityConsumer.kt +++ b/src/main/java/no/fintlabs/autorelation/kafka/AutoRelationEntityConsumer.kt @@ -2,6 +2,9 @@ package no.fintlabs.autorelation.kafka import no.fintlabs.autorelation.RelationEventService import no.fintlabs.consumer.config.ConsumerConfiguration +import no.fintlabs.consumer.health.KafkaListenerContainerHealthConfigurer +import no.fintlabs.consumer.health.KafkaListenerIds +import no.fintlabs.consumer.health.KafkaRuntimeHealthMonitor import no.fintlabs.consumer.kafka.KafkaConstants.RESOURCE_NAME import no.fintlabs.consumer.kafka.KafkaConsumerErrorHandling import no.fintlabs.consumer.kafka.entity.extractIdentifier @@ -21,18 +24,22 @@ import org.springframework.kafka.listener.ConcurrentMessageListenerContainer class AutoRelationEntityConsumer( private val consumerConfig: ConsumerConfiguration, private val relationEventService: RelationEventService, + private val kafkaRuntimeHealthMonitor: KafkaRuntimeHealthMonitor, + private val kafkaListenerContainerHealthConfigurer: KafkaListenerContainerHealthConfigurer, ) { companion object { private val logger = LoggerFactory.getLogger(AutoRelationEntityConsumer::class.java) private const val CONSUMER_NAME = "autorelation-entity" } - @Bean + @Bean(name = [KafkaListenerIds.AUTORELATION_ENTITY]) fun buildAutoRelationConsumer( parameterizedListenerContainerFactoryService: ParameterizedListenerContainerFactoryService, errorHandlerFactory: ErrorHandlerFactory, - ): ConcurrentMessageListenerContainer = - parameterizedListenerContainerFactoryService + ): ConcurrentMessageListenerContainer { + kafkaRuntimeHealthMonitor.registerListener(KafkaListenerIds.AUTORELATION_ENTITY) + + return parameterizedListenerContainerFactoryService .createRecordListenerContainerFactory( Any::class.java, this::consumeRecord, @@ -54,6 +61,7 @@ class AutoRelationEntityConsumer( CONSUMER_NAME, ), ), + kafkaListenerContainerHealthConfigurer::customize, ).createContainer( EntityTopicNameParameters .builder() @@ -66,6 +74,7 @@ class AutoRelationEntityConsumer( ).resourceName("${consumerConfig.domain}-${consumerConfig.packageName}") .build(), ).apply { concurrency = consumerConfig.kafka.entityConcurrency } + } fun consumeRecord(consumerRecord: ConsumerRecord) { consumerRecord @@ -77,6 +86,7 @@ class AutoRelationEntityConsumer( resource, ) } + kafkaRuntimeHealthMonitor.onRecordProcessed(KafkaListenerIds.AUTORELATION_ENTITY) } private fun ConsumerRecord.getResourceName(): String = diff --git a/src/main/java/no/fintlabs/autorelation/kafka/RelationUpdateConsumer.kt b/src/main/java/no/fintlabs/autorelation/kafka/RelationUpdateConsumer.kt index 886d4b9c..95624519 100644 --- a/src/main/java/no/fintlabs/autorelation/kafka/RelationUpdateConsumer.kt +++ b/src/main/java/no/fintlabs/autorelation/kafka/RelationUpdateConsumer.kt @@ -3,6 +3,10 @@ package no.fintlabs.autorelation.kafka import no.fintlabs.autorelation.AutoRelationService import no.fintlabs.autorelation.model.RelationUpdate import no.fintlabs.consumer.config.ConsumerConfiguration +import no.fintlabs.consumer.health.InitialKafkaBootstrapTracker +import no.fintlabs.consumer.health.KafkaListenerContainerHealthConfigurer +import no.fintlabs.consumer.health.KafkaListenerIds +import no.fintlabs.consumer.health.KafkaRuntimeHealthMonitor import no.fintlabs.consumer.kafka.KafkaConsumerErrorHandling import no.fintlabs.consumer.kafka.KafkaThroughputMetrics import no.novari.kafka.consuming.ErrorHandlerFactory @@ -23,13 +27,16 @@ class RelationUpdateConsumer( private val autoRelationService: AutoRelationService, private val consumerConfig: ConsumerConfiguration, private val kafkaThroughputMetrics: KafkaThroughputMetrics, + private val initialKafkaBootstrapTracker: InitialKafkaBootstrapTracker, + private val kafkaRuntimeHealthMonitor: KafkaRuntimeHealthMonitor, + private val kafkaListenerContainerHealthConfigurer: KafkaListenerContainerHealthConfigurer, ) { companion object { private val logger = LoggerFactory.getLogger(RelationUpdateConsumer::class.java) private const val CONSUMER_NAME = "relation-update" } - @Bean + @Bean(name = [KafkaListenerIds.RELATION_UPDATE]) @ConditionalOnProperty( name = ["fint.consumer.autorelation"], havingValue = "true", @@ -38,8 +45,11 @@ class RelationUpdateConsumer( fun relationUpdateConsumerContainer( parameterizedListenerContainerFactoryService: ParameterizedListenerContainerFactoryService, errorHandlerFactory: ErrorHandlerFactory, - ): ConcurrentMessageListenerContainer = - parameterizedListenerContainerFactoryService + ): ConcurrentMessageListenerContainer { + initialKafkaBootstrapTracker.registerBlockingListener(KafkaListenerIds.RELATION_UPDATE) + kafkaRuntimeHealthMonitor.registerListener(KafkaListenerIds.RELATION_UPDATE) + + return parameterizedListenerContainerFactoryService .createRecordListenerContainerFactory( RelationUpdate::class.java, this::consumeRecord, @@ -48,14 +58,21 @@ class RelationUpdateConsumer( .groupIdApplicationDefault() .maxPollRecordsKafkaDefault() .maxPollIntervalKafkaDefault() - .seekToBeginningOnAssignment() - .build(), + .seekToBeginningAndPerformOperationOnAssignment { assignments -> + initialKafkaBootstrapTracker.onPartitionsAssigned( + KafkaListenerIds.RELATION_UPDATE, + assignments.keys, + ) + }.onRevocation { partitions -> + initialKafkaBootstrapTracker.onPartitionsRevoked(KafkaListenerIds.RELATION_UPDATE, partitions) + }.build(), errorHandlerFactory.createErrorHandler( KafkaConsumerErrorHandling.createLoggingErrorHandlerConfiguration( logger, CONSUMER_NAME, ), ), + kafkaListenerContainerHealthConfigurer::customize, ).createContainer( EntityTopicNamePatternParameters .builder() @@ -68,7 +85,8 @@ class RelationUpdateConsumer( // Makes sure we listen to component patterns such as utdanning-vurdering'-relation-update' ).resource(TopicNamePatternParameterPattern.endingWith("-relation-update")) .build(), - ).apply { consumerConfig.kafka.relationConcurrency } + ).apply { concurrency = consumerConfig.kafka.relationConcurrency } + } fun consumeRecord(consumerRecord: ConsumerRecord) { val startedAt = System.nanoTime() @@ -76,6 +94,8 @@ class RelationUpdateConsumer( if (relationUpdate == null) { kafkaThroughputMetrics.recordRelationUpdateConsumer(null, "ignored_null", System.nanoTime() - startedAt) + initialKafkaBootstrapTracker.onRecordProcessed(KafkaListenerIds.RELATION_UPDATE, consumerRecord) + kafkaRuntimeHealthMonitor.onRecordProcessed(KafkaListenerIds.RELATION_UPDATE) return } @@ -85,6 +105,8 @@ class RelationUpdateConsumer( "ignored_foreign_component", System.nanoTime() - startedAt, ) + initialKafkaBootstrapTracker.onRecordProcessed(KafkaListenerIds.RELATION_UPDATE, consumerRecord) + kafkaRuntimeHealthMonitor.onRecordProcessed(KafkaListenerIds.RELATION_UPDATE) return } @@ -95,12 +117,15 @@ class RelationUpdateConsumer( "processed", System.nanoTime() - startedAt, ) + initialKafkaBootstrapTracker.onRecordProcessed(KafkaListenerIds.RELATION_UPDATE, consumerRecord) + kafkaRuntimeHealthMonitor.onRecordProcessed(KafkaListenerIds.RELATION_UPDATE) } catch (ex: Exception) { kafkaThroughputMetrics.recordRelationUpdateConsumer( relationUpdate.targetEntity.resourceName, "failed", System.nanoTime() - startedAt, ) + kafkaRuntimeHealthMonitor.onRecordProcessed(KafkaListenerIds.RELATION_UPDATE) throw ex } } diff --git a/src/main/java/no/fintlabs/consumer/health/BootstrapPartitionStatus.kt b/src/main/java/no/fintlabs/consumer/health/BootstrapPartitionStatus.kt new file mode 100644 index 00000000..c283e012 --- /dev/null +++ b/src/main/java/no/fintlabs/consumer/health/BootstrapPartitionStatus.kt @@ -0,0 +1,8 @@ +package no.fintlabs.consumer.health + +data class BootstrapPartitionStatus( + val partition: String, + val endOffset: Long, + val processedOffset: Long?, + val caughtUp: Boolean, +) diff --git a/src/main/java/no/fintlabs/consumer/health/BootstrapReadinessSnapshot.kt b/src/main/java/no/fintlabs/consumer/health/BootstrapReadinessSnapshot.kt new file mode 100644 index 00000000..1b352953 --- /dev/null +++ b/src/main/java/no/fintlabs/consumer/health/BootstrapReadinessSnapshot.kt @@ -0,0 +1,6 @@ +package no.fintlabs.consumer.health + +data class BootstrapReadinessSnapshot( + val ready: Boolean, + val blockingListeners: List, +) diff --git a/src/main/java/no/fintlabs/consumer/health/EndOffsetProvider.kt b/src/main/java/no/fintlabs/consumer/health/EndOffsetProvider.kt new file mode 100644 index 00000000..88268de1 --- /dev/null +++ b/src/main/java/no/fintlabs/consumer/health/EndOffsetProvider.kt @@ -0,0 +1,42 @@ +package no.fintlabs.consumer.health + +import jakarta.annotation.PreDestroy +import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.clients.admin.OffsetSpec +import org.apache.kafka.common.TopicPartition +import org.springframework.boot.autoconfigure.kafka.KafkaProperties +import org.springframework.stereotype.Component +import java.time.Duration +import java.util.concurrent.TimeUnit + +interface EndOffsetProvider { + fun latestOffsets(partitions: Set): Map +} + +@Component +class KafkaAdminEndOffsetProvider( + kafkaProperties: KafkaProperties, +) : EndOffsetProvider { + private val adminClient = AdminClient.create(kafkaProperties.buildAdminProperties(null)) + + override fun latestOffsets(partitions: Set): Map { + if (partitions.isEmpty()) { + return emptyMap() + } + + return adminClient + .listOffsets(partitions.associateWith { OffsetSpec.latest() }) + .all() + .get(TIMEOUT.toSeconds(), TimeUnit.SECONDS) + .mapValues { (_, result) -> result.offset() } + } + + @PreDestroy + fun close() { + adminClient.close(TIMEOUT) + } + + companion object { + private val TIMEOUT = Duration.ofSeconds(10) + } +} diff --git a/src/main/java/no/fintlabs/consumer/health/InitialKafkaBootstrapHealthIndicator.kt b/src/main/java/no/fintlabs/consumer/health/InitialKafkaBootstrapHealthIndicator.kt new file mode 100644 index 00000000..fbe89c6c --- /dev/null +++ b/src/main/java/no/fintlabs/consumer/health/InitialKafkaBootstrapHealthIndicator.kt @@ -0,0 +1,31 @@ +package no.fintlabs.consumer.health + +import org.springframework.boot.actuate.health.Health +import org.springframework.boot.actuate.health.HealthIndicator +import org.springframework.stereotype.Component + +@Component("initialKafkaBootstrap") +class InitialKafkaBootstrapHealthIndicator( + private val initialKafkaBootstrapTracker: InitialKafkaBootstrapTracker, +) : HealthIndicator { + override fun health(): Health { + val snapshot = initialKafkaBootstrapTracker.snapshot() + val builder = if (snapshot.ready) Health.up() else Health.outOfService() + + return builder + .withDetail("ready", snapshot.ready) + .withDetail("blockingListeners", snapshot.blockingListeners.size) + .withDetail( + "listeners", + snapshot.blockingListeners.associate { listener -> + listener.listenerId to + mapOf( + "assignmentSeen" to listener.assignmentSeen, + "completed" to listener.completed, + "assignedPartitions" to listener.assignedPartitions, + "caughtUpPartitions" to listener.caughtUpPartitions, + ) + }, + ).build() + } +} diff --git a/src/main/java/no/fintlabs/consumer/health/InitialKafkaBootstrapTracker.kt b/src/main/java/no/fintlabs/consumer/health/InitialKafkaBootstrapTracker.kt new file mode 100644 index 00000000..2756b390 --- /dev/null +++ b/src/main/java/no/fintlabs/consumer/health/InitialKafkaBootstrapTracker.kt @@ -0,0 +1,193 @@ +package no.fintlabs.consumer.health + +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.TopicPartition +import org.slf4j.LoggerFactory +import org.springframework.boot.availability.AvailabilityChangeEvent +import org.springframework.boot.availability.ReadinessState +import org.springframework.context.ApplicationContext +import org.springframework.stereotype.Service +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicReference +import kotlin.math.max + +@Service +class InitialKafkaBootstrapTracker( + private val endOffsetProvider: EndOffsetProvider, + private val applicationContext: ApplicationContext, + private val kafkaHealthMetrics: KafkaHealthMetrics, +) { + private val readinessPublished = AtomicReference(null) + private val bootstrapCompleted = AtomicBoolean(false) + private val blockingListeners = ConcurrentHashMap() + + init { + publishReadiness(false) + } + + fun registerBlockingListener(listenerId: String) { + blockingListeners.computeIfAbsent(listenerId) { ListenerBootstrapState() } + kafkaHealthMetrics.registerBootstrapListener(listenerId) + } + + fun onPartitionsAssigned( + listenerId: String, + assignments: Set, + ) { + if (bootstrapCompleted.get() || assignments.isEmpty()) { + return + } + + val listenerState = blockingListeners[listenerId] ?: return + val endOffsets = + try { + endOffsetProvider.latestOffsets(assignments) + } catch (exception: RuntimeException) { + logger.error( + "Failed to fetch end offsets for listener={} assignments={}", + listenerId, + assignments, + exception, + ) + kafkaHealthMetrics.recordBootstrapEndOffsetLookupFailure(listenerId) + publishReadiness(false) + return + } + + listenerState.assignmentSeen.set(true) + assignments.forEach { topicPartition -> + listenerState.partitions[topicPartition] = PartitionBootstrapState(endOffsets[topicPartition] ?: 0L) + } + + maybeCompleteListener(listenerId, listenerState) + maybeCompleteBootstrap() + kafkaHealthMetrics.updateBootstrapPendingPartitions(listenerId, pendingPartitions(listenerState)) + } + + fun onPartitionsRevoked( + listenerId: String, + partitions: Collection, + ) { + if (bootstrapCompleted.get()) { + return + } + + val listenerState = blockingListeners[listenerId] ?: return + partitions.forEach(listenerState.partitions::remove) + maybeCompleteListener(listenerId, listenerState) + maybeCompleteBootstrap() + kafkaHealthMetrics.updateBootstrapPendingPartitions(listenerId, pendingPartitions(listenerState)) + } + + fun onRecordProcessed( + listenerId: String, + record: ConsumerRecord<*, *>, + ) { + if (bootstrapCompleted.get()) { + return + } + + val listenerState = blockingListeners[listenerId] ?: return + val topicPartition = TopicPartition(record.topic(), record.partition()) + listenerState.partitions.computeIfPresent(topicPartition) { _, state -> + state.withOffset(record.offset()) + } + maybeCompleteListener(listenerId, listenerState) + maybeCompleteBootstrap() + kafkaHealthMetrics.updateBootstrapPendingPartitions(listenerId, pendingPartitions(listenerState)) + } + + fun snapshot(): BootstrapReadinessSnapshot { + val listenerStatuses = + blockingListeners.toSortedMap().map { (listenerId, listenerState) -> + ListenerBootstrapStatus( + listenerId = listenerId, + assignmentSeen = listenerState.assignmentSeen.get(), + completed = listenerState.completed.get(), + assignedPartitions = listenerState.partitions.size, + caughtUpPartitions = listenerState.partitions.values.count(PartitionBootstrapState::caughtUp), + partitions = + listenerState.partitions + .toSortedMap(compareBy({ it.topic() }, { it.partition() })) + .map { (topicPartition, partitionState) -> + BootstrapPartitionStatus( + partition = "${topicPartition.topic()}-${topicPartition.partition()}", + endOffset = partitionState.endOffset, + processedOffset = partitionState.processedOffset, + caughtUp = partitionState.caughtUp, + ) + }, + ) + } + + return BootstrapReadinessSnapshot( + ready = bootstrapCompleted.get(), + blockingListeners = listenerStatuses, + ) + } + + private fun maybeCompleteListener( + listenerId: String, + listenerState: ListenerBootstrapState, + ) { + if ( + !listenerState.completed.get() && + listenerState.assignmentSeen.get() && + listenerState.partitions.values.all(PartitionBootstrapState::caughtUp) + ) { + listenerState.completed.set(true) + kafkaHealthMetrics.markBootstrapCompleted(listenerId) + logger.info("Initial Kafka bootstrap completed for listener={}", listenerId) + } + } + + private fun maybeCompleteBootstrap() { + if ( + !bootstrapCompleted.get() && + blockingListeners.isNotEmpty() && + blockingListeners.values.all { it.completed.get() } + ) { + bootstrapCompleted.set(true) + kafkaHealthMetrics.markBootstrapAllCompleted() + publishReadiness(true) + logger.info("Initial Kafka bootstrap completed for all blocking listeners") + } + } + + private fun pendingPartitions(listenerState: ListenerBootstrapState): Int { + return listenerState.partitions.values.count { !it.caughtUp } + } + + private fun publishReadiness(ready: Boolean) { + val changed = readinessPublished.getAndSet(ready) != ready + if (changed) { + AvailabilityChangeEvent.publish( + applicationContext, + if (ready) ReadinessState.ACCEPTING_TRAFFIC else ReadinessState.REFUSING_TRAFFIC, + ) + } + } + + companion object { + private val logger = LoggerFactory.getLogger(InitialKafkaBootstrapTracker::class.java) + } +} + +private class ListenerBootstrapState { + val completed = AtomicBoolean(false) + val assignmentSeen = AtomicBoolean(false) + val partitions = ConcurrentHashMap() +} + +private data class PartitionBootstrapState( + val endOffset: Long, + val processedOffset: Long? = null, +) { + val caughtUp: Boolean + get() = endOffset == 0L || ((processedOffset ?: -1L) + 1) >= endOffset + + fun withOffset(offset: Long): PartitionBootstrapState { + return copy(processedOffset = processedOffset?.let { max(it, offset) } ?: offset) + } +} diff --git a/src/main/java/no/fintlabs/consumer/health/KafkaHealthConfiguration.kt b/src/main/java/no/fintlabs/consumer/health/KafkaHealthConfiguration.kt new file mode 100644 index 00000000..d5a1004b --- /dev/null +++ b/src/main/java/no/fintlabs/consumer/health/KafkaHealthConfiguration.kt @@ -0,0 +1,11 @@ +package no.fintlabs.consumer.health + +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import java.time.Clock + +@Configuration +class KafkaHealthConfiguration { + @Bean + fun kafkaHealthClock(): Clock = Clock.systemUTC() +} diff --git a/src/main/java/no/fintlabs/consumer/health/KafkaHealthMetrics.kt b/src/main/java/no/fintlabs/consumer/health/KafkaHealthMetrics.kt new file mode 100644 index 00000000..210dcac9 --- /dev/null +++ b/src/main/java/no/fintlabs/consumer/health/KafkaHealthMetrics.kt @@ -0,0 +1,190 @@ +package no.fintlabs.consumer.health + +import io.micrometer.core.instrument.Counter +import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.core.instrument.Tag +import io.micrometer.core.instrument.Timer +import org.springframework.stereotype.Component +import java.time.Clock +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicLong + +@Component +class KafkaHealthMetrics( + private val meterRegistry: MeterRegistry, + private val clock: Clock, + private val kafkaHealthProperties: KafkaHealthProperties, +) { + private val counters = ConcurrentHashMap() + private val timers = ConcurrentHashMap() + private val bootstrapStates = ConcurrentHashMap() + private val runtimeStates = ConcurrentHashMap() + private val bootstrapAllStartNanos = AtomicLong(System.nanoTime()) + private val bootstrapAllRecorded = AtomicBoolean(false) + + fun registerBootstrapListener(listenerId: String) { + bootstrapStates.computeIfAbsent(listenerId) { + BootstrapMetricState(System.nanoTime()).also { state -> + meterRegistry.gauge( + "fint.consumer.kafka.bootstrap.state", + listOf(Tag.of("listener", listenerId)), + state, + ) { it.inProgress.get().toDouble() } + meterRegistry.gauge( + "fint.consumer.kafka.bootstrap.partitions.pending", + listOf(Tag.of("listener", listenerId)), + state, + ) { it.pendingPartitions.get().toDouble() } + } + } + } + + fun updateBootstrapPendingPartitions( + listenerId: String, + pendingPartitions: Int, + ) { + bootstrapStates[listenerId]?.pendingPartitions?.set(pendingPartitions) + } + + fun markBootstrapCompleted(listenerId: String) { + bootstrapStates[listenerId]?.let { state -> + state.pendingPartitions.set(0) + state.inProgress.set(0) + if (state.completed.compareAndSet(false, true)) { + counter( + "fint.consumer.kafka.bootstrap.completed", + listOf(Tag.of("listener", listenerId)), + ).increment() + timer( + "fint.consumer.kafka.bootstrap.duration", + listOf(Tag.of("listener", listenerId)), + ).record(System.nanoTime() - state.startNanos, TimeUnit.NANOSECONDS) + } + } + } + + fun markBootstrapAllCompleted() { + if (bootstrapAllRecorded.compareAndSet(false, true)) { + counter( + "fint.consumer.kafka.bootstrap.completed", + listOf(Tag.of("listener", "all")), + ).increment() + timer( + "fint.consumer.kafka.bootstrap.duration", + listOf(Tag.of("listener", "all")), + ).record(System.nanoTime() - bootstrapAllStartNanos.get(), TimeUnit.NANOSECONDS) + } + } + + fun recordBootstrapEndOffsetLookupFailure(listenerId: String) { + counter( + "fint.consumer.kafka.bootstrap.end_offset.lookup.failures", + listOf(Tag.of("listener", listenerId)), + ).increment() + } + + fun registerRuntimeListener(listenerId: String) { + runtimeStates.computeIfAbsent(listenerId) { + RuntimeMetricState().also { state -> + meterRegistry.gauge( + "fint.consumer.kafka.runtime.unhealthy", + listOf(Tag.of("listener", listenerId)), + state, + ) { + if (it.isUnhealthy( + clock.millis(), + kafkaHealthProperties.runtimeGracePeriod.toMillis(), + ) + ) { + 1.0 + } else { + 0.0 + } + } + meterRegistry.gauge( + "fint.consumer.kafka.runtime.problem.duration", + listOf(Tag.of("listener", listenerId)), + state, + ) { it.problemDuration(clock.millis()).toDouble() } + } + } + } + + fun markRuntimeHealthy(listenerId: String) { + runtimeStates[listenerId]?.markHealthy(clock.millis()) + } + + fun markRuntimeProblem( + listenerId: String, + reason: String, + ) { + runtimeStates[listenerId]?.markProblem(clock.millis()) + counter( + "fint.consumer.kafka.runtime.problem", + listOf(Tag.of("listener", listenerId), Tag.of("reason", reason)), + ).increment() + } + + private fun counter( + name: String, + tags: List, + ): Counter { + return counters.computeIfAbsent(meterKey(name, tags)) { meterRegistry.counter(name, tags) } + } + + private fun timer( + name: String, + tags: List, + ): Timer { + return timers.computeIfAbsent(meterKey(name, tags)) { meterRegistry.timer(name, tags) } + } + + private fun meterKey( + name: String, + tags: List, + ): String { + return "$name|${tags.joinToString("|") { "${it.key}=${it.value}" }}" + } +} + +private class BootstrapMetricState( + val startNanos: Long, +) { + val pendingPartitions = AtomicInteger(0) + val inProgress = AtomicInteger(1) + val completed = AtomicBoolean(false) +} + +private class RuntimeMetricState { + private val problemSince = AtomicLong(0L) + + fun markHealthy(now: Long) { + problemSince.set(0L) + } + + fun markProblem(now: Long) { + problemSince.compareAndSet(0L, now) + } + + fun problemDuration(now: Long): Long { + return problemSince + .get() + .takeIf { it > 0L } + ?.let { now - it } + ?: 0L + } + + fun isUnhealthy( + now: Long, + gracePeriodMs: Long, + ): Boolean { + return problemSince + .get() + .takeIf { it > 0L } + ?.let { now - it >= gracePeriodMs } + ?: false + } +} diff --git a/src/main/java/no/fintlabs/consumer/health/KafkaHealthProperties.kt b/src/main/java/no/fintlabs/consumer/health/KafkaHealthProperties.kt new file mode 100644 index 00000000..9729bb40 --- /dev/null +++ b/src/main/java/no/fintlabs/consumer/health/KafkaHealthProperties.kt @@ -0,0 +1,12 @@ +package no.fintlabs.consumer.health + +import org.springframework.boot.context.properties.ConfigurationProperties +import java.time.Duration + +@ConfigurationProperties(prefix = "fint.consumer.health.kafka") +data class KafkaHealthProperties( + val idleEventInterval: Duration = Duration.ofMinutes(1), + val runtimeGracePeriod: Duration = Duration.ofMinutes(15), + val monitorIntervalSeconds: Int = 30, + val noPollThreshold: Float = 3.0f, +) diff --git a/src/main/java/no/fintlabs/consumer/health/KafkaListenerContainerHealthConfigurer.kt b/src/main/java/no/fintlabs/consumer/health/KafkaListenerContainerHealthConfigurer.kt new file mode 100644 index 00000000..eba005e7 --- /dev/null +++ b/src/main/java/no/fintlabs/consumer/health/KafkaListenerContainerHealthConfigurer.kt @@ -0,0 +1,15 @@ +package no.fintlabs.consumer.health + +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer +import org.springframework.stereotype.Component + +@Component +class KafkaListenerContainerHealthConfigurer( + private val kafkaHealthProperties: KafkaHealthProperties, +) { + fun customize(container: ConcurrentMessageListenerContainer) { + container.containerProperties.idleEventInterval = kafkaHealthProperties.idleEventInterval.toMillis() + container.containerProperties.monitorInterval = kafkaHealthProperties.monitorIntervalSeconds + container.containerProperties.noPollThreshold = kafkaHealthProperties.noPollThreshold + } +} diff --git a/src/main/java/no/fintlabs/consumer/health/KafkaListenerIds.kt b/src/main/java/no/fintlabs/consumer/health/KafkaListenerIds.kt new file mode 100644 index 00000000..d4e15e80 --- /dev/null +++ b/src/main/java/no/fintlabs/consumer/health/KafkaListenerIds.kt @@ -0,0 +1,9 @@ +package no.fintlabs.consumer.health + +object KafkaListenerIds { + const val ENTITY = "resourceEntityConsumerFactory" + const val REQUEST_EVENT = "requestFintEventRequestListenerContainer" + const val RESPONSE_EVENT = "responseFintEventContainerListener" + const val RELATION_UPDATE = "relationUpdateConsumerContainer" + const val AUTORELATION_ENTITY = "buildAutoRelationConsumer" +} diff --git a/src/main/java/no/fintlabs/consumer/health/KafkaRuntimeHealthMonitor.kt b/src/main/java/no/fintlabs/consumer/health/KafkaRuntimeHealthMonitor.kt new file mode 100644 index 00000000..f94705ed --- /dev/null +++ b/src/main/java/no/fintlabs/consumer/health/KafkaRuntimeHealthMonitor.kt @@ -0,0 +1,161 @@ +package no.fintlabs.consumer.health + +import org.springframework.boot.actuate.health.Health +import org.springframework.boot.actuate.health.HealthIndicator +import org.springframework.context.event.EventListener +import org.springframework.kafka.event.ConsumerFailedToStartEvent +import org.springframework.kafka.event.ConsumerStartedEvent +import org.springframework.kafka.event.ConsumerStoppedEvent +import org.springframework.kafka.event.KafkaEvent +import org.springframework.kafka.event.ListenerContainerIdleEvent +import org.springframework.kafka.event.NonResponsiveConsumerEvent +import org.springframework.kafka.listener.MessageListenerContainer +import org.springframework.stereotype.Component +import java.time.Clock +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.atomic.AtomicReference + +@Component("kafkaRuntime") +class KafkaRuntimeHealthMonitor( + private val kafkaHealthProperties: KafkaHealthProperties, + private val clock: Clock, + private val kafkaHealthMetrics: KafkaHealthMetrics, +) : HealthIndicator { + private val trackedListeners = ConcurrentHashMap.newKeySet() + private val listenerStates = ConcurrentHashMap() + + fun registerListener(listenerId: String) { + trackedListeners.add(listenerId) + listenerStates.computeIfAbsent(listenerId) { ListenerRuntimeState(now()) } + kafkaHealthMetrics.registerRuntimeListener(listenerId) + } + + fun onRecordProcessed(listenerId: String) { + if (!trackedListeners.contains(listenerId)) { + return + } + listenerStates.computeIfAbsent(listenerId) { ListenerRuntimeState(now()) }.markHealthy(now()) + kafkaHealthMetrics.markRuntimeHealthy(listenerId) + } + + @EventListener + fun onConsumerStarted(event: ConsumerStartedEvent) { + listenerIdOf(event)?.takeIf(trackedListeners::contains)?.let { listenerId -> + listenerStates.computeIfAbsent(listenerId) { ListenerRuntimeState(now()) }.markHealthy(now()) + kafkaHealthMetrics.markRuntimeHealthy(listenerId) + } + } + + @EventListener + fun onListenerContainerIdle(event: ListenerContainerIdleEvent) { + event.listenerId.takeIf(trackedListeners::contains)?.let { listenerId -> + listenerStates.computeIfAbsent(listenerId) { ListenerRuntimeState(now()) }.markHealthy(now()) + kafkaHealthMetrics.markRuntimeHealthy(listenerId) + } + } + + @EventListener + fun onNonResponsiveConsumer(event: NonResponsiveConsumerEvent) { + event.listenerId.takeIf(trackedListeners::contains)?.let { listenerId -> + listenerStates + .computeIfAbsent( + listenerId, + ) { ListenerRuntimeState(now()) } + .markProblem("NON_RESPONSIVE", now()) + kafkaHealthMetrics.markRuntimeProblem(listenerId, "NON_RESPONSIVE") + } + } + + @EventListener + fun onConsumerFailedToStart(event: ConsumerFailedToStartEvent) { + listenerIdOf(event)?.takeIf(trackedListeners::contains)?.let { listenerId -> + listenerStates + .computeIfAbsent( + listenerId, + ) { ListenerRuntimeState(now()) } + .markProblem("FAILED_TO_START", now()) + kafkaHealthMetrics.markRuntimeProblem(listenerId, "FAILED_TO_START") + } + } + + @EventListener + fun onConsumerStopped(event: ConsumerStoppedEvent) { + if (event.reason == ConsumerStoppedEvent.Reason.NORMAL) { + return + } + + listenerIdOf(event)?.takeIf(trackedListeners::contains)?.let { listenerId -> + listenerStates + .computeIfAbsent(listenerId) { ListenerRuntimeState(now()) } + .markProblem("STOPPED_${event.reason.name}", now()) + kafkaHealthMetrics.markRuntimeProblem(listenerId, "STOPPED_${event.reason.name}") + } + } + + override fun health(): Health { + val now = now() + val unhealthyListeners = + trackedListeners + .mapNotNull { listenerId -> + listenerStates[listenerId] + ?.takeIf { it.isUnhealthy(now, kafkaHealthProperties.runtimeGracePeriod.toMillis()) } + ?.let { listenerId to it.snapshot(now) } + }.toMap() + + val builder = if (unhealthyListeners.isEmpty()) Health.up() else Health.down() + + return builder + .withDetail("trackedListeners", trackedListeners.size) + .withDetail("runtimeGracePeriodMs", kafkaHealthProperties.runtimeGracePeriod.toMillis()) + .withDetail("unhealthyListeners", unhealthyListeners) + .build() + } + + private fun listenerIdOf(event: KafkaEvent): String? { + return runCatching { + event.getContainer(MessageListenerContainer::class.java).listenerId + }.getOrNull() + } + + private fun now(): Long = clock.millis() +} + +private class ListenerRuntimeState( + initialHealthyAt: Long, +) { + private val lastHealthyAt = AtomicLong(initialHealthyAt) + private val problemSince = AtomicLong(0L) + private val problem = AtomicReference(null) + + fun markHealthy(now: Long) { + lastHealthyAt.set(now) + problemSince.set(0L) + problem.set(null) + } + + fun markProblem( + reason: String, + now: Long, + ) { + problem.compareAndSet(null, reason) + problemSince.compareAndSet(0L, now) + } + + fun isUnhealthy( + now: Long, + gracePeriodMs: Long, + ): Boolean = + problemSince + .get() + .takeIf { it > 0L } + ?.let { now - it >= gracePeriodMs } + ?: false + + fun snapshot(now: Long): Map = + mapOf( + "problem" to problem.get(), + "problemDurationMs" to (now - problemSince.get()), + "lastHealthyAtMs" to lastHealthyAt.get(), + ) +} diff --git a/src/main/java/no/fintlabs/consumer/health/ListenerBootstrapStatus.kt b/src/main/java/no/fintlabs/consumer/health/ListenerBootstrapStatus.kt new file mode 100644 index 00000000..8aff7fa3 --- /dev/null +++ b/src/main/java/no/fintlabs/consumer/health/ListenerBootstrapStatus.kt @@ -0,0 +1,10 @@ +package no.fintlabs.consumer.health + +data class ListenerBootstrapStatus( + val listenerId: String, + val assignmentSeen: Boolean, + val completed: Boolean, + val assignedPartitions: Int, + val caughtUpPartitions: Int, + val partitions: List, +) diff --git a/src/main/java/no/fintlabs/consumer/kafka/entity/EntityConsumer.kt b/src/main/java/no/fintlabs/consumer/kafka/entity/EntityConsumer.kt index a4125075..dab154db 100644 --- a/src/main/java/no/fintlabs/consumer/kafka/entity/EntityConsumer.kt +++ b/src/main/java/no/fintlabs/consumer/kafka/entity/EntityConsumer.kt @@ -1,6 +1,10 @@ package no.fintlabs.consumer.kafka.entity import no.fintlabs.consumer.config.ConsumerConfiguration +import no.fintlabs.consumer.health.InitialKafkaBootstrapTracker +import no.fintlabs.consumer.health.KafkaListenerContainerHealthConfigurer +import no.fintlabs.consumer.health.KafkaListenerIds +import no.fintlabs.consumer.health.KafkaRuntimeHealthMonitor import no.fintlabs.consumer.kafka.KafkaConstants.RESOURCE_NAME import no.fintlabs.consumer.kafka.KafkaConsumerErrorHandling import no.fintlabs.consumer.kafka.stringValue @@ -21,18 +25,24 @@ class EntityConsumer( private val entityProcessingService: EntityProcessingService, private val consumerConfig: ConsumerConfiguration, private val resourceConverter: ResourceConverter, + private val initialKafkaBootstrapTracker: InitialKafkaBootstrapTracker, + private val kafkaRuntimeHealthMonitor: KafkaRuntimeHealthMonitor, + private val kafkaListenerContainerHealthConfigurer: KafkaListenerContainerHealthConfigurer, ) { companion object { private val logger = LoggerFactory.getLogger(EntityConsumer::class.java) private const val CONSUMER_NAME = "entity" } - @Bean + @Bean(name = [KafkaListenerIds.ENTITY]) fun resourceEntityConsumerFactory( parameterizedListenerContainerFactoryService: ParameterizedListenerContainerFactoryService, errorHandlerFactory: ErrorHandlerFactory, - ): ConcurrentMessageListenerContainer = - parameterizedListenerContainerFactoryService + ): ConcurrentMessageListenerContainer { + initialKafkaBootstrapTracker.registerBlockingListener(KafkaListenerIds.ENTITY) + kafkaRuntimeHealthMonitor.registerListener(KafkaListenerIds.ENTITY) + + return parameterizedListenerContainerFactoryService .createRecordListenerContainerFactory( Any::class.java, this::consumeRecord, @@ -41,14 +51,18 @@ class EntityConsumer( .groupIdApplicationDefault() .maxPollRecordsKafkaDefault() .maxPollIntervalKafkaDefault() - .seekToBeginningOnAssignment() - .build(), + .seekToBeginningAndPerformOperationOnAssignment { assignments -> + initialKafkaBootstrapTracker.onPartitionsAssigned(KafkaListenerIds.ENTITY, assignments.keys) + }.onRevocation { partitions -> + initialKafkaBootstrapTracker.onPartitionsRevoked(KafkaListenerIds.ENTITY, partitions) + }.build(), errorHandlerFactory.createErrorHandler( KafkaConsumerErrorHandling.createLoggingErrorHandlerConfiguration( logger, CONSUMER_NAME, ), ), + kafkaListenerContainerHealthConfigurer::customize, ).createContainer( EntityTopicNameParameters .builder() @@ -61,10 +75,15 @@ class EntityConsumer( ).resourceName("${consumerConfig.domain}-${consumerConfig.packageName}") .build(), ).apply { concurrency = consumerConfig.kafka.entityConcurrency } + } fun consumeRecord(consumerRecord: ConsumerRecord) = createEntityConsumerRecord(consumerRecord) - .let { entityProcessingService.processEntityConsumerRecord(it) } + .also { entityConsumerRecord -> + entityProcessingService.processEntityConsumerRecord(entityConsumerRecord) + initialKafkaBootstrapTracker.onRecordProcessed(KafkaListenerIds.ENTITY, consumerRecord) + kafkaRuntimeHealthMonitor.onRecordProcessed(KafkaListenerIds.ENTITY) + } private fun createEntityConsumerRecord(consumerRecord: ConsumerRecord) = consumerRecord.getResourceName().let { resourceName -> diff --git a/src/main/java/no/fintlabs/consumer/kafka/event/EventResponseConsumer.kt b/src/main/java/no/fintlabs/consumer/kafka/event/EventResponseConsumer.kt index 7082ea03..f8b9029f 100644 --- a/src/main/java/no/fintlabs/consumer/kafka/event/EventResponseConsumer.kt +++ b/src/main/java/no/fintlabs/consumer/kafka/event/EventResponseConsumer.kt @@ -2,6 +2,9 @@ package no.fintlabs.consumer.kafka.event import no.fintlabs.adapter.models.event.ResponseFintEvent import no.fintlabs.consumer.config.ConsumerConfiguration +import no.fintlabs.consumer.health.KafkaListenerContainerHealthConfigurer +import no.fintlabs.consumer.health.KafkaListenerIds +import no.fintlabs.consumer.health.KafkaRuntimeHealthMonitor import no.fintlabs.consumer.kafka.KafkaConsumerErrorHandling import no.fintlabs.consumer.resource.event.EventStatusCache import no.novari.kafka.consuming.ErrorHandlerFactory @@ -19,13 +22,17 @@ import org.springframework.kafka.listener.ConcurrentMessageListenerContainer class EventResponseConsumer( private val consumerConfig: ConsumerConfiguration, private val eventStatusCache: EventStatusCache, + private val kafkaRuntimeHealthMonitor: KafkaRuntimeHealthMonitor, + private val kafkaListenerContainerHealthConfigurer: KafkaListenerContainerHealthConfigurer, ) { - @Bean + @Bean(name = [KafkaListenerIds.RESPONSE_EVENT]) fun responseFintEventContainerListener( parameterizedListenerContainerFactoryService: ParameterizedListenerContainerFactoryService, errorHandlerFactory: ErrorHandlerFactory, - ): ConcurrentMessageListenerContainer = - parameterizedListenerContainerFactoryService + ): ConcurrentMessageListenerContainer { + kafkaRuntimeHealthMonitor.registerListener(KafkaListenerIds.RESPONSE_EVENT) + + return parameterizedListenerContainerFactoryService .createRecordListenerContainerFactory( ResponseFintEvent::class.java, this::consumeRecord, @@ -42,6 +49,7 @@ class EventResponseConsumer( CONSUMER_NAME, ), ), + kafkaListenerContainerHealthConfigurer::customize, ).createContainer( EventTopicNameParameters .builder() @@ -54,10 +62,12 @@ class EventResponseConsumer( ).eventName("${consumerConfig.domain}-${consumerConfig.packageName}-response") .build(), ).apply { concurrency = consumerConfig.kafka.responseConcurrency } + } private fun consumeRecord(consumerRecord: ConsumerRecord) { logger.info("Received Response: {}", consumerRecord.value()) eventStatusCache.trackResponse(consumerRecord.value().corrId, consumerRecord.value()) + kafkaRuntimeHealthMonitor.onRecordProcessed(KafkaListenerIds.RESPONSE_EVENT) } companion object { diff --git a/src/main/java/no/fintlabs/consumer/kafka/event/RequestFintEventConsumer.kt b/src/main/java/no/fintlabs/consumer/kafka/event/RequestFintEventConsumer.kt index 5244c157..5bfb758c 100644 --- a/src/main/java/no/fintlabs/consumer/kafka/event/RequestFintEventConsumer.kt +++ b/src/main/java/no/fintlabs/consumer/kafka/event/RequestFintEventConsumer.kt @@ -2,6 +2,9 @@ package no.fintlabs.consumer.kafka.event import no.fintlabs.adapter.models.event.RequestFintEvent import no.fintlabs.consumer.config.ConsumerConfiguration +import no.fintlabs.consumer.health.KafkaListenerContainerHealthConfigurer +import no.fintlabs.consumer.health.KafkaListenerIds +import no.fintlabs.consumer.health.KafkaRuntimeHealthMonitor import no.fintlabs.consumer.kafka.KafkaConsumerErrorHandling import no.fintlabs.consumer.resource.event.EventStatusCache import no.novari.kafka.consuming.ErrorHandlerFactory @@ -19,18 +22,22 @@ import org.springframework.kafka.listener.ConcurrentMessageListenerContainer class RequestFintEventConsumer( private val consumerConfig: ConsumerConfiguration, private val eventStatusCache: EventStatusCache, + private val kafkaRuntimeHealthMonitor: KafkaRuntimeHealthMonitor, + private val kafkaListenerContainerHealthConfigurer: KafkaListenerContainerHealthConfigurer, ) { companion object { private val logger = LoggerFactory.getLogger(RequestFintEventConsumer::class.java) private const val CONSUMER_NAME = "request-fint-event" } - @Bean + @Bean(name = [KafkaListenerIds.REQUEST_EVENT]) fun requestFintEventRequestListenerContainer( parameterizedListenerContainerFactoryService: ParameterizedListenerContainerFactoryService, errorHandlerFactory: ErrorHandlerFactory, - ): ConcurrentMessageListenerContainer = - parameterizedListenerContainerFactoryService + ): ConcurrentMessageListenerContainer { + kafkaRuntimeHealthMonitor.registerListener(KafkaListenerIds.REQUEST_EVENT) + + return parameterizedListenerContainerFactoryService .createRecordListenerContainerFactory( RequestFintEvent::class.java, this::consumeRecord, @@ -47,6 +54,7 @@ class RequestFintEventConsumer( CONSUMER_NAME, ), ), + kafkaListenerContainerHealthConfigurer::customize, ).createContainer( EventTopicNameParameters .builder() @@ -59,9 +67,11 @@ class RequestFintEventConsumer( ).eventName("${consumerConfig.domain}-${consumerConfig.packageName}-request") .build(), ).apply { concurrency = consumerConfig.kafka.requestConcurrency } + } private fun consumeRecord(consumerRecord: ConsumerRecord) { logger.info("Received Request: {}", consumerRecord.key()) eventStatusCache.trackRequest(consumerRecord.value()) + kafkaRuntimeHealthMonitor.onRecordProcessed(KafkaListenerIds.REQUEST_EVENT) } } diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml index 1ab24193..306f36ea 100644 --- a/src/main/resources/application.yaml +++ b/src/main/resources/application.yaml @@ -5,8 +5,12 @@ fint: exposed-endpoints: - /actuator/prometheus - /actuator/health + - /actuator/health/readiness + - /actuator/health/liveness + relation: base-url: https://api.felleskomponent.no + consumer: pod-url: http://fint-core-consumer-${fint.consumer.domain}-${fint.consumer.package}:8080 base-url: ${fint.relation.base-url} @@ -15,6 +19,12 @@ fint: packageName: ${fint.consumer.package} org-id: ${fint.org-id} coreVersionHeader: 2 + health: + kafka: + idle-event-interval: 1m + runtime-grace-period: 15m + monitor-interval-seconds: 30 + no-poll-threshold: 3.0 novari: kafka: @@ -34,6 +44,16 @@ spring: base-path: ${fint.consumer.domain}/${fint.consumer.package} management: + endpoint: + health: + probes: + enabled: true + group: + readiness: + include: readinessState,initialKafkaBootstrap + liveness: + include: livenessState,kafkaRuntime + endpoints: web: exposure: diff --git a/src/test/java/no/fintlabs/consumer/health/InitialKafkaBootstrapHealthIndicatorTest.kt b/src/test/java/no/fintlabs/consumer/health/InitialKafkaBootstrapHealthIndicatorTest.kt new file mode 100644 index 00000000..0c50fff9 --- /dev/null +++ b/src/test/java/no/fintlabs/consumer/health/InitialKafkaBootstrapHealthIndicatorTest.kt @@ -0,0 +1,29 @@ +package no.fintlabs.consumer.health + +import io.mockk.every +import io.mockk.mockk +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.springframework.boot.actuate.health.Status + +class InitialKafkaBootstrapHealthIndicatorTest { + private val tracker: InitialKafkaBootstrapTracker = mockk() + + @Test + fun `should report out of service while bootstrap is incomplete`() { + every { tracker.snapshot() } returns BootstrapReadinessSnapshot(false, emptyList()) + + val health = InitialKafkaBootstrapHealthIndicator(tracker).health() + + assertEquals(Status.OUT_OF_SERVICE, health.status) + } + + @Test + fun `should report up when bootstrap is complete`() { + every { tracker.snapshot() } returns BootstrapReadinessSnapshot(true, emptyList()) + + val health = InitialKafkaBootstrapHealthIndicator(tracker).health() + + assertEquals(Status.UP, health.status) + } +} diff --git a/src/test/java/no/fintlabs/consumer/health/InitialKafkaBootstrapTrackerTest.kt b/src/test/java/no/fintlabs/consumer/health/InitialKafkaBootstrapTrackerTest.kt new file mode 100644 index 00000000..28d0b3b8 --- /dev/null +++ b/src/test/java/no/fintlabs/consumer/health/InitialKafkaBootstrapTrackerTest.kt @@ -0,0 +1,191 @@ +package no.fintlabs.consumer.health + +import io.micrometer.core.instrument.simple.SimpleMeterRegistry +import io.mockk.every +import io.mockk.mockk +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.TopicPartition +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.springframework.context.ApplicationContext +import java.time.Duration +import java.time.Instant + +class InitialKafkaBootstrapTrackerTest { + private val endOffsetProvider: EndOffsetProvider = mockk() + private val applicationContext: ApplicationContext = mockk(relaxed = true) + private val meterRegistry = SimpleMeterRegistry() + private val clock = MutableClock(Instant.parse("2026-03-20T10:00:00Z")) + private val kafkaHealthMetrics = KafkaHealthMetrics(meterRegistry, clock, KafkaHealthProperties()) + + private lateinit var tracker: InitialKafkaBootstrapTracker + + @BeforeEach + fun setUp() { + tracker = InitialKafkaBootstrapTracker(endOffsetProvider, applicationContext, kafkaHealthMetrics) + tracker.registerBlockingListener(KafkaListenerIds.ENTITY) + } + + @Test + fun `should stay unready until all assigned partitions catch up`() { + val partition0 = TopicPartition("topic", 0) + val partition1 = TopicPartition("topic", 1) + + every { endOffsetProvider.latestOffsets(setOf(partition0, partition1)) } returns + mapOf(partition0 to 2L, partition1 to 1L) + + tracker.onPartitionsAssigned(KafkaListenerIds.ENTITY, setOf(partition0, partition1)) + + assertFalse(tracker.snapshot().ready) + + tracker.onRecordProcessed(KafkaListenerIds.ENTITY, ConsumerRecord("topic", 0, 0L, "key", "value")) + assertFalse(tracker.snapshot().ready) + + tracker.onRecordProcessed(KafkaListenerIds.ENTITY, ConsumerRecord("topic", 1, 0L, "key", "value")) + assertFalse(tracker.snapshot().ready) + + tracker.onRecordProcessed(KafkaListenerIds.ENTITY, ConsumerRecord("topic", 0, 1L, "key", "value")) + assertTrue(tracker.snapshot().ready) + } + + @Test + fun `should become ready immediately when assigned partitions are empty at startup offset`() { + val partition0 = TopicPartition("topic", 0) + + every { endOffsetProvider.latestOffsets(setOf(partition0)) } returns + mapOf(partition0 to 0L) + + tracker.onPartitionsAssigned(KafkaListenerIds.ENTITY, setOf(partition0)) + + assertTrue(tracker.snapshot().ready) + } + + @Test + fun `should ignore new assignments after initial bootstrap has completed`() { + val partition0 = TopicPartition("topic", 0) + val partition1 = TopicPartition("topic", 1) + + every { endOffsetProvider.latestOffsets(setOf(partition0)) } returns mapOf(partition0 to 1L) + every { endOffsetProvider.latestOffsets(setOf(partition1)) } returns mapOf(partition1 to 2L) + + tracker.onPartitionsAssigned(KafkaListenerIds.ENTITY, setOf(partition0)) + tracker.onRecordProcessed(KafkaListenerIds.ENTITY, ConsumerRecord("topic", 0, 0L, "key", "value")) + + assertTrue(tracker.snapshot().ready) + + tracker.onPartitionsAssigned(KafkaListenerIds.ENTITY, setOf(partition1)) + + assertTrue(tracker.snapshot().ready) + } + + @Test + fun `should wait for both entity and relation update listeners`() { + val entityPartition = TopicPartition("entity-topic", 0) + val relationPartition = TopicPartition("relation-topic", 0) + + tracker.registerBlockingListener(KafkaListenerIds.RELATION_UPDATE) + + every { endOffsetProvider.latestOffsets(setOf(entityPartition)) } returns mapOf(entityPartition to 1L) + every { endOffsetProvider.latestOffsets(setOf(relationPartition)) } returns mapOf(relationPartition to 1L) + + tracker.onPartitionsAssigned(KafkaListenerIds.ENTITY, setOf(entityPartition)) + tracker.onPartitionsAssigned(KafkaListenerIds.RELATION_UPDATE, setOf(relationPartition)) + tracker.onRecordProcessed(KafkaListenerIds.ENTITY, ConsumerRecord("entity-topic", 0, 0L, "key", "value")) + + assertFalse(tracker.snapshot().ready) + + tracker.onRecordProcessed( + KafkaListenerIds.RELATION_UPDATE, + ConsumerRecord("relation-topic", 0, 0L, "key", "value"), + ) + + assertTrue(tracker.snapshot().ready) + } + + @Test + fun `should publish bootstrap metrics for completion and pending partitions`() { + val partition0 = TopicPartition("topic", 0) + val partition1 = TopicPartition("topic", 1) + + every { endOffsetProvider.latestOffsets(setOf(partition0, partition1)) } returns + mapOf(partition0 to 2L, partition1 to 1L) + + tracker.onPartitionsAssigned(KafkaListenerIds.ENTITY, setOf(partition0, partition1)) + + assertEquals( + 2.0, + meterRegistry + .get("fint.consumer.kafka.bootstrap.partitions.pending") + .tag("listener", KafkaListenerIds.ENTITY) + .gauge() + .value(), + ) + assertEquals( + 1.0, + meterRegistry + .get("fint.consumer.kafka.bootstrap.state") + .tag("listener", KafkaListenerIds.ENTITY) + .gauge() + .value(), + ) + + clock.advance(Duration.ofSeconds(2)) + tracker.onRecordProcessed(KafkaListenerIds.ENTITY, ConsumerRecord("topic", 0, 0L, "key", "value")) + tracker.onRecordProcessed(KafkaListenerIds.ENTITY, ConsumerRecord("topic", 1, 0L, "key", "value")) + tracker.onRecordProcessed(KafkaListenerIds.ENTITY, ConsumerRecord("topic", 0, 1L, "key", "value")) + + assertEquals( + 0.0, + meterRegistry + .get("fint.consumer.kafka.bootstrap.partitions.pending") + .tag("listener", KafkaListenerIds.ENTITY) + .gauge() + .value(), + ) + assertEquals( + 0.0, + meterRegistry + .get("fint.consumer.kafka.bootstrap.state") + .tag("listener", KafkaListenerIds.ENTITY) + .gauge() + .value(), + ) + assertEquals( + 1.0, + meterRegistry + .get("fint.consumer.kafka.bootstrap.completed") + .tag("listener", KafkaListenerIds.ENTITY) + .counter() + .count(), + ) + assertEquals( + 1.0, + meterRegistry + .get("fint.consumer.kafka.bootstrap.completed") + .tag("listener", "all") + .counter() + .count(), + ) + } + + @Test + fun `should count end offset lookup failures`() { + val partition0 = TopicPartition("topic", 0) + + every { endOffsetProvider.latestOffsets(setOf(partition0)) } throws RuntimeException("boom") + + tracker.onPartitionsAssigned(KafkaListenerIds.ENTITY, setOf(partition0)) + + assertEquals( + 1.0, + meterRegistry + .get("fint.consumer.kafka.bootstrap.end_offset.lookup.failures") + .tag("listener", KafkaListenerIds.ENTITY) + .counter() + .count(), + ) + } +} diff --git a/src/test/java/no/fintlabs/consumer/health/KafkaRuntimeHealthMonitorTest.kt b/src/test/java/no/fintlabs/consumer/health/KafkaRuntimeHealthMonitorTest.kt new file mode 100644 index 00000000..4b6bd637 --- /dev/null +++ b/src/test/java/no/fintlabs/consumer/health/KafkaRuntimeHealthMonitorTest.kt @@ -0,0 +1,141 @@ +package no.fintlabs.consumer.health + +import io.micrometer.core.instrument.simple.SimpleMeterRegistry +import io.mockk.mockk +import org.apache.kafka.clients.consumer.Consumer +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.springframework.boot.actuate.health.Status +import org.springframework.kafka.event.NonResponsiveConsumerEvent +import java.time.Duration +import java.time.Instant + +class KafkaRuntimeHealthMonitorTest { + @Test + fun `should stay up during grace period for non responsive consumer`() { + val clock = MutableClock(Instant.parse("2026-03-20T10:00:00Z")) + val monitor = + KafkaRuntimeHealthMonitor( + KafkaHealthProperties(runtimeGracePeriod = Duration.ofMinutes(15)), + clock, + KafkaHealthMetrics( + SimpleMeterRegistry(), + clock, + KafkaHealthProperties(runtimeGracePeriod = Duration.ofMinutes(15)), + ), + ) + + monitor.registerListener(KafkaListenerIds.ENTITY) + monitor.onRecordProcessed(KafkaListenerIds.ENTITY) + monitor.onNonResponsiveConsumer(nonResponsiveConsumerEvent(KafkaListenerIds.ENTITY)) + + clock.advance(Duration.ofMinutes(14)) + + assertEquals(Status.UP, monitor.health().status) + } + + @Test + fun `should go down after grace period for non responsive consumer`() { + val clock = MutableClock(Instant.parse("2026-03-20T10:00:00Z")) + val monitor = + KafkaRuntimeHealthMonitor( + KafkaHealthProperties(runtimeGracePeriod = Duration.ofMinutes(15)), + clock, + KafkaHealthMetrics( + SimpleMeterRegistry(), + clock, + KafkaHealthProperties(runtimeGracePeriod = Duration.ofMinutes(15)), + ), + ) + + monitor.registerListener(KafkaListenerIds.ENTITY) + monitor.onRecordProcessed(KafkaListenerIds.ENTITY) + monitor.onNonResponsiveConsumer(nonResponsiveConsumerEvent(KafkaListenerIds.ENTITY)) + + clock.advance(Duration.ofMinutes(16)) + + assertEquals(Status.DOWN, monitor.health().status) + } + + @Test + fun `should recover after healthy activity resumes`() { + val clock = MutableClock(Instant.parse("2026-03-20T10:00:00Z")) + val monitor = + KafkaRuntimeHealthMonitor( + KafkaHealthProperties(runtimeGracePeriod = Duration.ofMinutes(15)), + clock, + KafkaHealthMetrics( + SimpleMeterRegistry(), + clock, + KafkaHealthProperties(runtimeGracePeriod = Duration.ofMinutes(15)), + ), + ) + + monitor.registerListener(KafkaListenerIds.ENTITY) + monitor.onNonResponsiveConsumer(nonResponsiveConsumerEvent(KafkaListenerIds.ENTITY)) + clock.advance(Duration.ofMinutes(5)) + + monitor.onRecordProcessed(KafkaListenerIds.ENTITY) + clock.advance(Duration.ofMinutes(20)) + + assertEquals(Status.UP, monitor.health().status) + } + + @Test + fun `should publish runtime metrics for problem and unhealthy state`() { + val clock = MutableClock(Instant.parse("2026-03-20T10:00:00Z")) + val meterRegistry = SimpleMeterRegistry() + val properties = KafkaHealthProperties(runtimeGracePeriod = Duration.ofMinutes(15)) + val monitor = KafkaRuntimeHealthMonitor(properties, clock, KafkaHealthMetrics(meterRegistry, clock, properties)) + + monitor.registerListener(KafkaListenerIds.ENTITY) + monitor.onNonResponsiveConsumer(nonResponsiveConsumerEvent(KafkaListenerIds.ENTITY)) + + assertEquals( + 1.0, + meterRegistry + .get("fint.consumer.kafka.runtime.problem") + .tag("listener", KafkaListenerIds.ENTITY) + .tag("reason", "NON_RESPONSIVE") + .counter() + .count(), + ) + assertEquals( + 0.0, + meterRegistry + .get("fint.consumer.kafka.runtime.unhealthy") + .tag("listener", KafkaListenerIds.ENTITY) + .gauge() + .value(), + ) + + clock.advance(Duration.ofMinutes(16)) + + assertEquals( + 1.0, + meterRegistry + .get("fint.consumer.kafka.runtime.unhealthy") + .tag("listener", KafkaListenerIds.ENTITY) + .gauge() + .value(), + ) + assertEquals( + Duration.ofMinutes(16).toMillis().toDouble(), + meterRegistry + .get("fint.consumer.kafka.runtime.problem.duration") + .tag("listener", KafkaListenerIds.ENTITY) + .gauge() + .value(), + ) + } +} + +private fun nonResponsiveConsumerEvent(listenerId: String): NonResponsiveConsumerEvent = + NonResponsiveConsumerEvent( + Any(), + Any(), + 1_000L, + listenerId, + emptyList(), + mockk>(relaxed = true), + ) diff --git a/src/test/java/no/fintlabs/consumer/health/MutableClock.kt b/src/test/java/no/fintlabs/consumer/health/MutableClock.kt new file mode 100644 index 00000000..7dd866f5 --- /dev/null +++ b/src/test/java/no/fintlabs/consumer/health/MutableClock.kt @@ -0,0 +1,21 @@ +package no.fintlabs.consumer.health + +import java.time.Clock +import java.time.Duration +import java.time.Instant +import java.time.ZoneId +import java.time.ZoneOffset + +class MutableClock( + private var instant: Instant, +) : Clock() { + override fun getZone(): ZoneId = ZoneOffset.UTC + + override fun withZone(zone: ZoneId): Clock = this + + override fun instant(): Instant = instant + + fun advance(duration: Duration) { + instant = instant.plus(duration) + } +} diff --git a/src/test/java/no/fintlabs/consumer/kafka/entity/RelationUpdateConsumerTest.kt b/src/test/java/no/fintlabs/consumer/kafka/entity/RelationUpdateConsumerTest.kt index c54442c2..b335dfa0 100644 --- a/src/test/java/no/fintlabs/consumer/kafka/entity/RelationUpdateConsumerTest.kt +++ b/src/test/java/no/fintlabs/consumer/kafka/entity/RelationUpdateConsumerTest.kt @@ -8,6 +8,9 @@ import no.fintlabs.autorelation.kafka.RelationUpdateConsumer import no.fintlabs.autorelation.model.RelationUpdate import no.fintlabs.autorelation.model.createEntityDescriptor import no.fintlabs.consumer.config.ConsumerConfiguration +import no.fintlabs.consumer.health.InitialKafkaBootstrapTracker +import no.fintlabs.consumer.health.KafkaListenerContainerHealthConfigurer +import no.fintlabs.consumer.health.KafkaRuntimeHealthMonitor import no.fintlabs.consumer.kafka.KafkaThroughputMetrics import org.apache.kafka.clients.consumer.ConsumerRecord import org.junit.jupiter.api.BeforeEach @@ -20,6 +23,9 @@ class RelationUpdateConsumerTest { private lateinit var consumerRecord: ConsumerRecord private lateinit var relationUpdate: RelationUpdate private lateinit var kafkaThroughputMetrics: KafkaThroughputMetrics + private lateinit var initialKafkaBootstrapTracker: InitialKafkaBootstrapTracker + private lateinit var kafkaRuntimeHealthMonitor: KafkaRuntimeHealthMonitor + private lateinit var kafkaListenerContainerHealthConfigurer: KafkaListenerContainerHealthConfigurer @BeforeEach fun setUp() { @@ -32,7 +38,18 @@ class RelationUpdateConsumerTest { } kafkaThroughputMetrics = mockk(relaxed = true) - relationUpdateConsumer = RelationUpdateConsumer(autoRelationService, consumerConfig, kafkaThroughputMetrics) + initialKafkaBootstrapTracker = mockk(relaxed = true) + kafkaRuntimeHealthMonitor = mockk(relaxed = true) + kafkaListenerContainerHealthConfigurer = mockk(relaxed = true) + relationUpdateConsumer = + RelationUpdateConsumer( + autoRelationService, + consumerConfig, + kafkaThroughputMetrics, + initialKafkaBootstrapTracker, + kafkaRuntimeHealthMonitor, + kafkaListenerContainerHealthConfigurer, + ) } @Test From 789880ce0cf7843fbb817cdfb07fc26dcf4f47a4 Mon Sep 17 00:00:00 2001 From: Hknots Date: Mon, 20 Apr 2026 15:36:19 +0200 Subject: [PATCH 02/10] refactor: remove unused legacy resource topics configuration --- .../java/no/fintlabs/consumer/config/ConsumerConfiguration.kt | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/no/fintlabs/consumer/config/ConsumerConfiguration.kt b/src/main/java/no/fintlabs/consumer/config/ConsumerConfiguration.kt index 35df462f..b2314970 100644 --- a/src/main/java/no/fintlabs/consumer/config/ConsumerConfiguration.kt +++ b/src/main/java/no/fintlabs/consumer/config/ConsumerConfiguration.kt @@ -46,7 +46,6 @@ data class ConsumerConfiguration( // TODO: Cleanup configuration data class KafkaConfiguration( // Entity consumption in EntityConsumer & AutoRelationEntityConsumer - val consumeLegacyResourceTopics: Boolean = false, val entityConcurrency: Int = 1, val relationEntitySeekToBeginning: Boolean = false, val fetchMinBytes: Int = 65536, From 392f8d79af49532bc65de89b31ca0d36d73d1568 Mon Sep 17 00:00:00 2001 From: Hknots Date: Mon, 20 Apr 2026 15:36:59 +0200 Subject: [PATCH 03/10] test: use health metric arguments & remove legacy resource topic tests --- .../consumer/kafka/entity/EntityConsumer.kt | 2 - .../kafka/entity/EntityConsumerTest.kt | 98 +++++-------------- .../kafka/event/EventResponseConsumerTest.kt | 14 ++- .../event/RequestFintEventConsumerTest.kt | 16 ++- 4 files changed, 50 insertions(+), 80 deletions(-) diff --git a/src/main/java/no/fintlabs/consumer/kafka/entity/EntityConsumer.kt b/src/main/java/no/fintlabs/consumer/kafka/entity/EntityConsumer.kt index c9402100..d68fef17 100644 --- a/src/main/java/no/fintlabs/consumer/kafka/entity/EntityConsumer.kt +++ b/src/main/java/no/fintlabs/consumer/kafka/entity/EntityConsumer.kt @@ -16,7 +16,6 @@ import no.novari.kafka.consuming.ParameterizedListenerContainerFactoryService import no.novari.kafka.topic.name.EntityTopicNamePatternParameters import no.novari.kafka.topic.name.TopicNamePatternParameterPattern import no.novari.kafka.topic.name.TopicNamePatternPrefixParameters -import no.novari.metamodel.MetamodelService import org.apache.kafka.clients.consumer.ConsumerRecord import org.slf4j.LoggerFactory import org.springframework.context.annotation.Bean @@ -31,7 +30,6 @@ class EntityConsumer( private val initialKafkaBootstrapTracker: InitialKafkaBootstrapTracker, private val kafkaRuntimeHealthMonitor: KafkaRuntimeHealthMonitor, private val kafkaListenerContainerHealthConfigurer: KafkaListenerContainerHealthConfigurer, - private val metamodelService: MetamodelService, ) { companion object { private val logger = LoggerFactory.getLogger(EntityConsumer::class.java) diff --git a/src/test/java/no/fintlabs/consumer/kafka/entity/EntityConsumerTest.kt b/src/test/java/no/fintlabs/consumer/kafka/entity/EntityConsumerTest.kt index bda560ec..e9ee11b9 100644 --- a/src/test/java/no/fintlabs/consumer/kafka/entity/EntityConsumerTest.kt +++ b/src/test/java/no/fintlabs/consumer/kafka/entity/EntityConsumerTest.kt @@ -7,6 +7,9 @@ import io.mockk.verify import no.fintlabs.consumer.config.ConsumerConfiguration import no.fintlabs.consumer.config.KafkaConfiguration import no.fintlabs.consumer.config.OrgId +import no.fintlabs.consumer.health.InitialKafkaBootstrapTracker +import no.fintlabs.consumer.health.KafkaListenerContainerHealthConfigurer +import no.fintlabs.consumer.health.KafkaRuntimeHealthMonitor import no.fintlabs.consumer.kafka.KafkaConstants.LAST_MODIFIED import no.fintlabs.consumer.kafka.KafkaConstants.RESOURCE_NAME import no.fintlabs.consumer.resource.ResourceConverter @@ -16,9 +19,6 @@ import no.novari.kafka.consuming.ParameterizedListenerContainerFactory import no.novari.kafka.consuming.ParameterizedListenerContainerFactoryService import no.novari.kafka.topic.name.EntityTopicNamePatternParameters import no.novari.kafka.topic.name.TopicNamePatternParameters -import no.novari.metamodel.MetamodelService -import no.novari.metamodel.model.Component -import no.novari.metamodel.model.Resource import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.ConsumerRecord.NULL_SIZE @@ -42,11 +42,13 @@ class EntityConsumerTest { private lateinit var entityProcessingService: EntityProcessingService private lateinit var consumerConfig: ConsumerConfiguration private lateinit var resourceConverter: ResourceConverter - private lateinit var metamodelService: MetamodelService private lateinit var factoryService: ParameterizedListenerContainerFactoryService private lateinit var errorHandlerFactory: ErrorHandlerFactory private lateinit var factory: ParameterizedListenerContainerFactory private lateinit var container: ConcurrentMessageListenerContainer + private lateinit var initialKafkaBootstrapTracker: InitialKafkaBootstrapTracker + private lateinit var kafkaRuntimeHealthMonitor: KafkaRuntimeHealthMonitor + private lateinit var kafkaListenerContainerHealthConfigurer: KafkaListenerContainerHealthConfigurer private lateinit var entityConsumer: EntityConsumer @BeforeEach @@ -54,10 +56,12 @@ class EntityConsumerTest { entityProcessingService = mockk(relaxed = true) consumerConfig = mockk() resourceConverter = mockk(relaxed = true) - metamodelService = mockk() factoryService = mockk() errorHandlerFactory = mockk(relaxed = true) factory = mockk() + initialKafkaBootstrapTracker = mockk(relaxed = true) + kafkaRuntimeHealthMonitor = mockk(relaxed = true) + kafkaListenerContainerHealthConfigurer = mockk(relaxed = true) container = ConcurrentMessageListenerContainer( mockk>(relaxed = true), @@ -79,12 +83,20 @@ class EntityConsumerTest { } returns factory every { factory.createContainer(any()) } returns container - entityConsumer = EntityConsumer(entityProcessingService, consumerConfig, resourceConverter, metamodelService) + entityConsumer = + EntityConsumer( + entityProcessingService, + consumerConfig, + resourceConverter, + initialKafkaBootstrapTracker, + kafkaRuntimeHealthMonitor, + kafkaListenerContainerHealthConfigurer, + ) } @Test - fun `when consumeLegacyResourceTopics is disabled, only component topic is consumed`() { - every { consumerConfig.kafka } returns KafkaConfiguration(consumeLegacyResourceTopics = false) + fun `only component topic is consumed`() { + every { consumerConfig.kafka } returns KafkaConfiguration() val captured = slot() every { factory.createContainer(capture(captured)) } returns container @@ -98,33 +110,6 @@ class EntityConsumerTest { assertEquals(listOf("utdanning-vurdering"), resourcePattern.anyOfValues) } - @Test - fun `when consumeLegacyResourceTopics is enabled, component topic and one topic per resource are consumed`() { - every { consumerConfig.kafka } returns KafkaConfiguration(consumeLegacyResourceTopics = true) - - val component = mockk() - val resource1 = mockk() - val resource2 = mockk() - every { resource1.name } returns "elevfravar" - every { resource2.name } returns "eksamenskarakter" - every { component.resources } returns listOf(resource1, resource2) - every { metamodelService.getComponent("utdanning", "vurdering") } returns component - - val captured = slot() - every { factory.createContainer(capture(captured)) } returns container - - entityConsumer.resourceEntityConsumerFactory(factoryService, errorHandlerFactory) - - val resourcePattern = - captured.captured.topicNamePatternSuffixParameters - .first() - .pattern - assertEquals(3, resourcePattern.anyOfValues.size) - assertTrue(resourcePattern.anyOfValues.contains("utdanning-vurdering")) - assertTrue(resourcePattern.anyOfValues.contains("utdanning-vurdering-elevfravar")) - assertTrue(resourcePattern.anyOfValues.contains("utdanning-vurdering-eksamenskarakter")) - } - @Test fun `container gets fetch and idle settings from consumer configuration`() { every { @@ -163,8 +148,8 @@ class EntityConsumerTest { } @Test - fun `when consumeLegacyResourceTopics is disabled and header is present, resource name is read from header`() { - every { consumerConfig.kafka } returns KafkaConfiguration(consumeLegacyResourceTopics = false) + fun `when header is present, resource name is read from header`() { + every { consumerConfig.kafka } returns KafkaConfiguration() val captured = slot() every { entityProcessingService.processEntityConsumerRecord(capture(captured)) } returns Unit @@ -180,49 +165,14 @@ class EntityConsumerTest { } @Test - fun `when consumeLegacyResourceTopics is disabled and header is missing, an exception is thrown`() { - every { consumerConfig.kafka } returns KafkaConfiguration(consumeLegacyResourceTopics = false) + fun `when header is missing, an exception is thrown`() { + every { consumerConfig.kafka } returns KafkaConfiguration() assertThrows { entityConsumer.consumeRecord(createConsumerRecord(topic = "utdanning-vurdering", resourceNameHeader = null)) } } - @Test - fun `when consumeLegacyResourceTopics is enabled and header is present, resource name is read from header`() { - every { consumerConfig.kafka } returns KafkaConfiguration(consumeLegacyResourceTopics = true) - - val captured = slot() - every { entityProcessingService.processEntityConsumerRecord(capture(captured)) } returns Unit - - entityConsumer.consumeRecord( - createConsumerRecord( - topic = "utdanning-vurdering-elevfravar", - resourceNameHeader = "elevfravar", - ), - ) - - assertEquals("elevfravar", captured.captured.resourceName) - } - - @Suppress("ktlint:standard:max-line-length") - @Test - fun `when consumeLegacyResourceTopics is enabled and header is missing, resource name falls back to last topic segment`() { - every { consumerConfig.kafka } returns KafkaConfiguration(consumeLegacyResourceTopics = true) - - val captured = slot() - every { entityProcessingService.processEntityConsumerRecord(capture(captured)) } returns Unit - - entityConsumer.consumeRecord( - createConsumerRecord( - topic = "utdanning-vurdering-elevfravar", - resourceNameHeader = null, - ), - ) - - assertEquals("elevfravar", captured.captured.resourceName) - } - @Test fun `listener configuration seeks to beginning on partition assignment`() { val config = captureListenerConfig() diff --git a/src/test/java/no/fintlabs/consumer/kafka/event/EventResponseConsumerTest.kt b/src/test/java/no/fintlabs/consumer/kafka/event/EventResponseConsumerTest.kt index 7ec5d0a5..60d3d53b 100644 --- a/src/test/java/no/fintlabs/consumer/kafka/event/EventResponseConsumerTest.kt +++ b/src/test/java/no/fintlabs/consumer/kafka/event/EventResponseConsumerTest.kt @@ -8,6 +8,8 @@ import no.fintlabs.adapter.models.event.ResponseFintEvent import no.fintlabs.consumer.config.ConsumerConfiguration import no.fintlabs.consumer.config.KafkaConfiguration import no.fintlabs.consumer.config.OrgId +import no.fintlabs.consumer.health.KafkaListenerContainerHealthConfigurer +import no.fintlabs.consumer.health.KafkaRuntimeHealthMonitor import no.fintlabs.consumer.resource.event.EventStatusCache import no.novari.kafka.consuming.ErrorHandlerFactory import no.novari.kafka.consuming.ListenerConfiguration @@ -30,6 +32,8 @@ class EventResponseConsumerTest { private lateinit var errorHandlerFactory: ErrorHandlerFactory private lateinit var factory: ParameterizedListenerContainerFactory private lateinit var container: ConcurrentMessageListenerContainer + private lateinit var kafkaRuntimeHealthMonitor: KafkaRuntimeHealthMonitor + private lateinit var kafkaListenerContainerHealthConfigurer: KafkaListenerContainerHealthConfigurer private lateinit var eventResponseConsumer: EventResponseConsumer @BeforeEach @@ -40,6 +44,8 @@ class EventResponseConsumerTest { errorHandlerFactory = mockk(relaxed = true) factory = mockk() container = mockk(relaxed = true) + kafkaRuntimeHealthMonitor = mockk(relaxed = true) + kafkaListenerContainerHealthConfigurer = mockk(relaxed = true) every { consumerConfig.orgId } returns OrgId.from("foo.bar") every { consumerConfig.domain } returns "utdanning" @@ -57,7 +63,13 @@ class EventResponseConsumerTest { } returns factory every { factory.createContainer(any()) } returns container - eventResponseConsumer = EventResponseConsumer(consumerConfig, eventStatusCache) + eventResponseConsumer = + EventResponseConsumer( + consumerConfig, + eventStatusCache, + kafkaRuntimeHealthMonitor, + kafkaListenerContainerHealthConfigurer, + ) } @Test diff --git a/src/test/java/no/fintlabs/consumer/kafka/event/RequestFintEventConsumerTest.kt b/src/test/java/no/fintlabs/consumer/kafka/event/RequestFintEventConsumerTest.kt index 93759c8f..69e8a17d 100644 --- a/src/test/java/no/fintlabs/consumer/kafka/event/RequestFintEventConsumerTest.kt +++ b/src/test/java/no/fintlabs/consumer/kafka/event/RequestFintEventConsumerTest.kt @@ -8,6 +8,8 @@ import no.fintlabs.adapter.models.event.RequestFintEvent import no.fintlabs.consumer.config.ConsumerConfiguration import no.fintlabs.consumer.config.KafkaConfiguration import no.fintlabs.consumer.config.OrgId +import no.fintlabs.consumer.health.KafkaListenerContainerHealthConfigurer +import no.fintlabs.consumer.health.KafkaRuntimeHealthMonitor import no.fintlabs.consumer.resource.event.EventStatusCache import no.novari.kafka.consuming.ErrorHandlerFactory import no.novari.kafka.consuming.ListenerConfiguration @@ -18,10 +20,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.TopicPartition import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertDoesNotThrow import org.springframework.kafka.listener.ConcurrentMessageListenerContainer import org.springframework.kafka.listener.ConsumerSeekAware -import java.util.UUID import java.util.function.Consumer import kotlin.test.assertTrue @@ -33,6 +33,8 @@ class RequestFintEventConsumerTest { private lateinit var factory: ParameterizedListenerContainerFactory private lateinit var container: ConcurrentMessageListenerContainer private lateinit var requestFintEventConsumer: RequestFintEventConsumer + private lateinit var kafkaRuntimeHealthMonitor: KafkaRuntimeHealthMonitor + private lateinit var kafkaListenerContainerHealthConfigurer: KafkaListenerContainerHealthConfigurer @BeforeEach fun setUp() { @@ -42,6 +44,8 @@ class RequestFintEventConsumerTest { errorHandlerFactory = mockk(relaxed = true) factory = mockk() container = mockk(relaxed = true) + kafkaRuntimeHealthMonitor = mockk(relaxed = true) + kafkaListenerContainerHealthConfigurer = mockk(relaxed = true) every { consumerConfig.orgId } returns OrgId.from("foo.bar") every { consumerConfig.domain } returns "utdanning" @@ -59,7 +63,13 @@ class RequestFintEventConsumerTest { } returns factory every { factory.createContainer(any()) } returns container - requestFintEventConsumer = RequestFintEventConsumer(consumerConfig, eventStatusCache) + requestFintEventConsumer = + RequestFintEventConsumer( + consumerConfig, + eventStatusCache, + kafkaRuntimeHealthMonitor, + kafkaListenerContainerHealthConfigurer, + ) } @Test From 397b8753067bc8a69172c57526fbfacafd1a4cc6 Mon Sep 17 00:00:00 2001 From: Hknots Date: Mon, 20 Apr 2026 15:37:44 +0200 Subject: [PATCH 04/10] test: remove unnecessary LegacyResourceTopic integration tests --- .../integration/LegacyResourceTopicIT.kt | 138 ------------------ 1 file changed, 138 deletions(-) delete mode 100644 src/integrationTest/java/no/fintlabs/consumer/integration/LegacyResourceTopicIT.kt diff --git a/src/integrationTest/java/no/fintlabs/consumer/integration/LegacyResourceTopicIT.kt b/src/integrationTest/java/no/fintlabs/consumer/integration/LegacyResourceTopicIT.kt deleted file mode 100644 index 8b35abb3..00000000 --- a/src/integrationTest/java/no/fintlabs/consumer/integration/LegacyResourceTopicIT.kt +++ /dev/null @@ -1,138 +0,0 @@ -package no.fintlabs.consumer.integration - -import com.fasterxml.jackson.databind.ObjectMapper -import no.fintlabs.Application -import no.fintlabs.adapter.models.sync.SyncType -import no.fintlabs.cache.CacheService -import no.fintlabs.utils.EntityProducer -import no.novari.fint.model.felles.kompleksedatatyper.Identifikator -import no.novari.fint.model.resource.Link -import no.novari.fint.model.resource.utdanning.timeplan.FagResource -import org.awaitility.kotlin.await -import org.junit.jupiter.api.AfterEach -import org.junit.jupiter.api.Test -import org.springframework.beans.factory.annotation.Autowired -import org.springframework.boot.test.context.SpringBootTest -import org.springframework.boot.test.web.server.LocalServerPort -import org.springframework.kafka.test.context.EmbeddedKafka -import org.springframework.test.annotation.DirtiesContext -import org.springframework.test.context.TestPropertySource -import org.springframework.test.web.reactive.server.WebTestClient -import java.time.Clock -import java.time.Duration -import java.util.UUID -import java.util.concurrent.TimeUnit -import kotlin.test.assertEquals - -@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = [Application::class]) -@EmbeddedKafka(partitions = 1) -@TestPropertySource( - properties = [ - "spring.kafka.bootstrap-servers=\${spring.embedded.kafka.brokers}", - "spring.kafka.consumer.auto-offset-reset=earliest", - "spring.kafka.consumer.group-id=legacy-resource-topic-it", - "novari.kafka.default-replicas=1", - "fint.relation.base-url=https://foo.org", - "fint.org-id=foo.org", - "fint.consumer.domain=utdanning", - "fint.consumer.package=timeplan", - "fint.consumer.kafka.consume-legacy-resource-topics=true", - "fint.security.enabled=false", - ], -) -@DirtiesContext -class LegacyResourceTopicIT { - @LocalServerPort - private var port: Int = 0 - - private val client by lazy { - WebTestClient - .bindToServer() - .baseUrl("http://localhost:$port/utdanning/timeplan") - .responseTimeout(Duration.ofSeconds(10)) - .build() - } - - @Autowired - lateinit var objectMapper: ObjectMapper - - @Autowired - lateinit var cacheService: CacheService - - @Autowired - lateinit var entityProducer: EntityProducer - - private val clock: Clock = Clock.systemUTC() - - @AfterEach - fun tearDown() { - cacheService.getCache("fag").evictExpired(Long.MAX_VALUE) - } - - @Test - fun `resource published to legacy topic with resource-name header is cached`() { - entityProducer - .publishToLegacyResourceTopic( - resourceName = "fag", - resource = createFag("1", "Fag 1"), - resourceId = "1", - syncType = SyncType.FULL, - syncCorrId = UUID.randomUUID().toString(), - syncTotalSize = 1, - timestamp = clock.millis(), - includeResourceNameHeader = true, - ).get(10, TimeUnit.SECONDS) - - await.atMost(Duration.ofSeconds(30)).untilAsserted { - val resources = fetchAllFagResources() - assertEquals(1, resources.size) - assertEquals("Fag 1", resources.first().navn) - } - } - - @Test - fun `resource published to legacy topic without resource-name header falls back to topic name and is cached`() { - entityProducer - .publishToLegacyResourceTopic( - resourceName = "fag", - resource = createFag("2", "Fag 2"), - resourceId = "2", - syncType = SyncType.FULL, - syncCorrId = UUID.randomUUID().toString(), - syncTotalSize = 1, - timestamp = clock.millis(), - includeResourceNameHeader = false, - ).get(10, TimeUnit.SECONDS) - - await.atMost(Duration.ofSeconds(30)).untilAsserted { - val resources = fetchAllFagResources() - assertEquals(1, resources.size) - assertEquals("Fag 2", resources.first().navn) - } - } - - private fun createFag( - id: String, - navn: String, - ): FagResource { - val fag = FagResource() - fag.systemId = Identifikator().apply { identifikatorverdi = "systemid-fag-$id" } - fag.navn = navn - fag.links["self"] = listOf(Link("https://foo.org/utdanning/timeplan/fag/systemid/$id")) - return fag - } - - private fun fetchAllFagResources(): List { - val page = - client - .get() - .uri("/fag") - .exchange() - .expectStatus() - .isOk - .expectBody(FintResourcesPage::class.java) - .returnResult() - .responseBody ?: return emptyList() - return page.getResources(objectMapper, FagResource::class.java) - } -} From c0d60a108d386639f4e12a71c0834639cf607ea1 Mon Sep 17 00:00:00 2001 From: Hknots Date: Tue, 21 Apr 2026 08:43:45 +0200 Subject: [PATCH 05/10] feat: block readiness until request and response event topics are drained Register REQUEST_EVENT and RESPONSE_EVENT consumers with InitialKafkaBootstrapTracker so readiness stays OUT_OF_SERVICE until both topics catch up to their assignment-time end offsets. Previously only ENTITY and RELATION_UPDATE gated bootstrap. Co-Authored-By: Claude Opus 4.7 --- .../consumer/kafka/event/EventResponseConsumer.kt | 14 ++++++++++++-- .../kafka/event/RequestFintEventConsumer.kt | 14 ++++++++++++-- .../kafka/event/EventResponseConsumerTest.kt | 4 ++++ .../kafka/event/RequestFintEventConsumerTest.kt | 4 ++++ 4 files changed, 32 insertions(+), 4 deletions(-) diff --git a/src/main/java/no/fintlabs/consumer/kafka/event/EventResponseConsumer.kt b/src/main/java/no/fintlabs/consumer/kafka/event/EventResponseConsumer.kt index 71bcc502..62aa28ad 100644 --- a/src/main/java/no/fintlabs/consumer/kafka/event/EventResponseConsumer.kt +++ b/src/main/java/no/fintlabs/consumer/kafka/event/EventResponseConsumer.kt @@ -2,6 +2,7 @@ package no.fintlabs.consumer.kafka.event import no.fintlabs.adapter.models.event.ResponseFintEvent import no.fintlabs.consumer.config.ConsumerConfiguration +import no.fintlabs.consumer.health.InitialKafkaBootstrapTracker import no.fintlabs.consumer.health.KafkaListenerContainerHealthConfigurer import no.fintlabs.consumer.health.KafkaListenerIds import no.fintlabs.consumer.health.KafkaRuntimeHealthMonitor @@ -23,6 +24,7 @@ import org.springframework.kafka.listener.ConcurrentMessageListenerContainer class EventResponseConsumer( private val consumerConfig: ConsumerConfiguration, private val eventStatusCache: EventStatusCache, + private val initialKafkaBootstrapTracker: InitialKafkaBootstrapTracker, private val kafkaRuntimeHealthMonitor: KafkaRuntimeHealthMonitor, private val kafkaListenerContainerHealthConfigurer: KafkaListenerContainerHealthConfigurer, ) { @@ -31,6 +33,7 @@ class EventResponseConsumer( parameterizedListenerContainerFactoryService: ParameterizedListenerContainerFactoryService, errorHandlerFactory: ErrorHandlerFactory, ): ConcurrentMessageListenerContainer { + initialKafkaBootstrapTracker.registerBlockingListener(KafkaListenerIds.RESPONSE_EVENT) kafkaRuntimeHealthMonitor.registerListener(KafkaListenerIds.RESPONSE_EVENT) return parameterizedListenerContainerFactoryService @@ -42,8 +45,14 @@ class EventResponseConsumer( .groupIdApplicationDefaultWithUniqueSuffix() .maxPollRecordsKafkaDefault() .maxPollIntervalKafkaDefault() - .seekToBeginningOnAssignment() - .build(), + .seekToBeginningAndPerformOperationOnAssignment { assignments -> + initialKafkaBootstrapTracker.onPartitionsAssigned( + KafkaListenerIds.RESPONSE_EVENT, + assignments.keys, + ) + }.onRevocation { partitions -> + initialKafkaBootstrapTracker.onPartitionsRevoked(KafkaListenerIds.RESPONSE_EVENT, partitions) + }.build(), errorHandlerFactory.createErrorHandler( KafkaConsumerErrorHandling.createLoggingErrorHandlerConfiguration( logger, @@ -73,6 +82,7 @@ class EventResponseConsumer( private fun consumeRecord(consumerRecord: ConsumerRecord) { logger.info("Received Response: {}", consumerRecord.value()) eventStatusCache.trackResponse(consumerRecord.value().corrId, consumerRecord.value()) + initialKafkaBootstrapTracker.onRecordProcessed(KafkaListenerIds.RESPONSE_EVENT, consumerRecord) kafkaRuntimeHealthMonitor.onRecordProcessed(KafkaListenerIds.RESPONSE_EVENT) } diff --git a/src/main/java/no/fintlabs/consumer/kafka/event/RequestFintEventConsumer.kt b/src/main/java/no/fintlabs/consumer/kafka/event/RequestFintEventConsumer.kt index 70908e59..5349a37d 100644 --- a/src/main/java/no/fintlabs/consumer/kafka/event/RequestFintEventConsumer.kt +++ b/src/main/java/no/fintlabs/consumer/kafka/event/RequestFintEventConsumer.kt @@ -2,6 +2,7 @@ package no.fintlabs.consumer.kafka.event import no.fintlabs.adapter.models.event.RequestFintEvent import no.fintlabs.consumer.config.ConsumerConfiguration +import no.fintlabs.consumer.health.InitialKafkaBootstrapTracker import no.fintlabs.consumer.health.KafkaListenerContainerHealthConfigurer import no.fintlabs.consumer.health.KafkaListenerIds import no.fintlabs.consumer.health.KafkaRuntimeHealthMonitor @@ -23,6 +24,7 @@ import org.springframework.kafka.listener.ConcurrentMessageListenerContainer class RequestFintEventConsumer( private val consumerConfig: ConsumerConfiguration, private val eventStatusCache: EventStatusCache, + private val initialKafkaBootstrapTracker: InitialKafkaBootstrapTracker, private val kafkaRuntimeHealthMonitor: KafkaRuntimeHealthMonitor, private val kafkaListenerContainerHealthConfigurer: KafkaListenerContainerHealthConfigurer, ) { @@ -36,6 +38,7 @@ class RequestFintEventConsumer( parameterizedListenerContainerFactoryService: ParameterizedListenerContainerFactoryService, errorHandlerFactory: ErrorHandlerFactory, ): ConcurrentMessageListenerContainer { + initialKafkaBootstrapTracker.registerBlockingListener(KafkaListenerIds.REQUEST_EVENT) kafkaRuntimeHealthMonitor.registerListener(KafkaListenerIds.REQUEST_EVENT) return parameterizedListenerContainerFactoryService @@ -47,8 +50,14 @@ class RequestFintEventConsumer( .groupIdApplicationDefaultWithUniqueSuffix() .maxPollRecordsKafkaDefault() .maxPollIntervalKafkaDefault() - .seekToBeginningOnAssignment() - .build(), + .seekToBeginningAndPerformOperationOnAssignment { assignments -> + initialKafkaBootstrapTracker.onPartitionsAssigned( + KafkaListenerIds.REQUEST_EVENT, + assignments.keys, + ) + }.onRevocation { partitions -> + initialKafkaBootstrapTracker.onPartitionsRevoked(KafkaListenerIds.REQUEST_EVENT, partitions) + }.build(), errorHandlerFactory.createErrorHandler( KafkaConsumerErrorHandling.createLoggingErrorHandlerConfiguration( logger, @@ -78,6 +87,7 @@ class RequestFintEventConsumer( private fun consumeRecord(consumerRecord: ConsumerRecord) { logger.info("Received Request: {}", consumerRecord.key()) eventStatusCache.trackRequest(consumerRecord.value()) + initialKafkaBootstrapTracker.onRecordProcessed(KafkaListenerIds.REQUEST_EVENT, consumerRecord) kafkaRuntimeHealthMonitor.onRecordProcessed(KafkaListenerIds.REQUEST_EVENT) } } diff --git a/src/test/java/no/fintlabs/consumer/kafka/event/EventResponseConsumerTest.kt b/src/test/java/no/fintlabs/consumer/kafka/event/EventResponseConsumerTest.kt index 60d3d53b..32ebeca4 100644 --- a/src/test/java/no/fintlabs/consumer/kafka/event/EventResponseConsumerTest.kt +++ b/src/test/java/no/fintlabs/consumer/kafka/event/EventResponseConsumerTest.kt @@ -8,6 +8,7 @@ import no.fintlabs.adapter.models.event.ResponseFintEvent import no.fintlabs.consumer.config.ConsumerConfiguration import no.fintlabs.consumer.config.KafkaConfiguration import no.fintlabs.consumer.config.OrgId +import no.fintlabs.consumer.health.InitialKafkaBootstrapTracker import no.fintlabs.consumer.health.KafkaListenerContainerHealthConfigurer import no.fintlabs.consumer.health.KafkaRuntimeHealthMonitor import no.fintlabs.consumer.resource.event.EventStatusCache @@ -32,6 +33,7 @@ class EventResponseConsumerTest { private lateinit var errorHandlerFactory: ErrorHandlerFactory private lateinit var factory: ParameterizedListenerContainerFactory private lateinit var container: ConcurrentMessageListenerContainer + private lateinit var initialKafkaBootstrapTracker: InitialKafkaBootstrapTracker private lateinit var kafkaRuntimeHealthMonitor: KafkaRuntimeHealthMonitor private lateinit var kafkaListenerContainerHealthConfigurer: KafkaListenerContainerHealthConfigurer private lateinit var eventResponseConsumer: EventResponseConsumer @@ -44,6 +46,7 @@ class EventResponseConsumerTest { errorHandlerFactory = mockk(relaxed = true) factory = mockk() container = mockk(relaxed = true) + initialKafkaBootstrapTracker = mockk(relaxed = true) kafkaRuntimeHealthMonitor = mockk(relaxed = true) kafkaListenerContainerHealthConfigurer = mockk(relaxed = true) @@ -67,6 +70,7 @@ class EventResponseConsumerTest { EventResponseConsumer( consumerConfig, eventStatusCache, + initialKafkaBootstrapTracker, kafkaRuntimeHealthMonitor, kafkaListenerContainerHealthConfigurer, ) diff --git a/src/test/java/no/fintlabs/consumer/kafka/event/RequestFintEventConsumerTest.kt b/src/test/java/no/fintlabs/consumer/kafka/event/RequestFintEventConsumerTest.kt index 69e8a17d..735df0e0 100644 --- a/src/test/java/no/fintlabs/consumer/kafka/event/RequestFintEventConsumerTest.kt +++ b/src/test/java/no/fintlabs/consumer/kafka/event/RequestFintEventConsumerTest.kt @@ -8,6 +8,7 @@ import no.fintlabs.adapter.models.event.RequestFintEvent import no.fintlabs.consumer.config.ConsumerConfiguration import no.fintlabs.consumer.config.KafkaConfiguration import no.fintlabs.consumer.config.OrgId +import no.fintlabs.consumer.health.InitialKafkaBootstrapTracker import no.fintlabs.consumer.health.KafkaListenerContainerHealthConfigurer import no.fintlabs.consumer.health.KafkaRuntimeHealthMonitor import no.fintlabs.consumer.resource.event.EventStatusCache @@ -33,6 +34,7 @@ class RequestFintEventConsumerTest { private lateinit var factory: ParameterizedListenerContainerFactory private lateinit var container: ConcurrentMessageListenerContainer private lateinit var requestFintEventConsumer: RequestFintEventConsumer + private lateinit var initialKafkaBootstrapTracker: InitialKafkaBootstrapTracker private lateinit var kafkaRuntimeHealthMonitor: KafkaRuntimeHealthMonitor private lateinit var kafkaListenerContainerHealthConfigurer: KafkaListenerContainerHealthConfigurer @@ -44,6 +46,7 @@ class RequestFintEventConsumerTest { errorHandlerFactory = mockk(relaxed = true) factory = mockk() container = mockk(relaxed = true) + initialKafkaBootstrapTracker = mockk(relaxed = true) kafkaRuntimeHealthMonitor = mockk(relaxed = true) kafkaListenerContainerHealthConfigurer = mockk(relaxed = true) @@ -67,6 +70,7 @@ class RequestFintEventConsumerTest { RequestFintEventConsumer( consumerConfig, eventStatusCache, + initialKafkaBootstrapTracker, kafkaRuntimeHealthMonitor, kafkaListenerContainerHealthConfigurer, ) From 7655e14109d328e0e7bd619f1a1796135a5cf50e Mon Sep 17 00:00:00 2001 From: Hknots Date: Tue, 21 Apr 2026 08:51:02 +0200 Subject: [PATCH 06/10] fix: advance bootstrap tracker on relation update processing failure Spring's error handler skips failed records (noRetries + skipRecordOnRecoveryFailure), so the consumer moves on, but the bootstrap tracker's processedOffset did not. A poison record at the tail of a partition would leave readiness OUT_OF_SERVICE forever. Advance the tracker in the catch before rethrowing. --- .../no/fintlabs/autorelation/kafka/RelationUpdateConsumer.kt | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/no/fintlabs/autorelation/kafka/RelationUpdateConsumer.kt b/src/main/java/no/fintlabs/autorelation/kafka/RelationUpdateConsumer.kt index bc1f96ec..f3c5f8ff 100644 --- a/src/main/java/no/fintlabs/autorelation/kafka/RelationUpdateConsumer.kt +++ b/src/main/java/no/fintlabs/autorelation/kafka/RelationUpdateConsumer.kt @@ -133,6 +133,7 @@ class RelationUpdateConsumer( "failed", System.nanoTime() - startedAt, ) + initialKafkaBootstrapTracker.onRecordProcessed(KafkaListenerIds.RELATION_UPDATE, consumerRecord) kafkaRuntimeHealthMonitor.onRecordProcessed(KafkaListenerIds.RELATION_UPDATE) throw ex } From 1a409403f125204208f0b44b8d6b0b410e7f966f Mon Sep 17 00:00:00 2001 From: Jan Ove Kongshaug Date: Tue, 21 Apr 2026 09:51:55 +0200 Subject: [PATCH 07/10] Update health check documentation and add Grafana dashboard --- ...-core-consumer-kafka-health-dashboard.json | 1150 +++++++++++++++++ docs/kafka-health-checks.md | 10 +- 2 files changed, 1157 insertions(+), 3 deletions(-) create mode 100644 docs/grafana/fint-core-consumer-kafka-health-dashboard.json diff --git a/docs/grafana/fint-core-consumer-kafka-health-dashboard.json b/docs/grafana/fint-core-consumer-kafka-health-dashboard.json new file mode 100644 index 00000000..375c8047 --- /dev/null +++ b/docs/grafana/fint-core-consumer-kafka-health-dashboard.json @@ -0,0 +1,1150 @@ +{ + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": { + "type": "grafana", + "uid": "-- Grafana --" + }, + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 1, + "id": null, + "links": [], + "liveNow": false, + "panels": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 1 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 5, + "w": 6, + "x": 0, + "y": 0 + }, + "id": 1, + "options": { + "colorMode": "value", + "graphMode": "none", + "justifyMode": "center", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "textMode": "auto" + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "sum(max by (pod, listener) (fint_consumer_kafka_runtime_unhealthy{namespace=~\"$namespace\", pod=~\"$pod\", listener=~\"$listener\"}))", + "instant": true, + "legendFormat": "", + "refId": "A" + } + ], + "title": "Unhealthy Listeners Now", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "yellow", + "value": 1 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 5, + "w": 6, + "x": 6, + "y": 0 + }, + "id": 2, + "options": { + "colorMode": "value", + "graphMode": "none", + "justifyMode": "center", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "textMode": "auto" + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "sum(max by (pod, listener) (fint_consumer_kafka_bootstrap_state{namespace=~\"$namespace\", pod=~\"$pod\", listener=~\"$listener\"}))", + "instant": true, + "legendFormat": "", + "refId": "A" + } + ], + "title": "Listeners Bootstrapping Now", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "yellow", + "value": 1 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 5, + "w": 6, + "x": 12, + "y": 0 + }, + "id": 3, + "options": { + "colorMode": "value", + "graphMode": "none", + "justifyMode": "center", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "textMode": "auto" + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "sum(max by (pod, listener) (fint_consumer_kafka_bootstrap_partitions_pending{namespace=~\"$namespace\", pod=~\"$pod\", listener=~\"$listener\"}))", + "instant": true, + "legendFormat": "", + "refId": "A" + } + ], + "title": "Pending Bootstrap Partitions Now", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "decimals": 0, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 1 + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 5, + "w": 6, + "x": 18, + "y": 0 + }, + "id": 4, + "options": { + "colorMode": "value", + "graphMode": "none", + "justifyMode": "center", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "sum" + ], + "fields": "", + "values": false + }, + "textMode": "auto" + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "sum(increase(fint_consumer_kafka_runtime_problem_total{namespace=~\"$namespace\", pod=~\"$pod\", listener=~\"$listener\"}[24h]))", + "instant": true, + "legendFormat": "", + "refId": "A" + } + ], + "title": "Runtime Problems Last 24h", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 15, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 2, + "pointSize": 4, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "yellow", + "value": 1 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 5 + }, + "id": 5, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "max by (pod, listener) (fint_consumer_kafka_bootstrap_state{namespace=~\"$namespace\", pod=~\"$pod\", listener=~\"$listener\"})", + "legendFormat": "{{pod}} / {{listener}}", + "refId": "A" + } + ], + "title": "Bootstrap State", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 15, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 2, + "pointSize": 4, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "yellow", + "value": 1 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 5 + }, + "id": 6, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "max by (pod, listener) (fint_consumer_kafka_bootstrap_partitions_pending{namespace=~\"$namespace\", pod=~\"$pod\", listener=~\"$listener\"})", + "legendFormat": "{{pod}} / {{listener}}", + "refId": "A" + } + ], + "title": "Pending Bootstrap Partitions", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 15, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 2, + "pointSize": 4, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 1 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 13 + }, + "id": 7, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "max by (pod, listener) (fint_consumer_kafka_runtime_unhealthy{namespace=~\"$namespace\", pod=~\"$pod\", listener=~\"$listener\"})", + "legendFormat": "{{pod}} / {{listener}}", + "refId": "A" + } + ], + "title": "Runtime Unhealthy", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 15, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 2, + "pointSize": 4, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "yellow", + "value": 60000 + }, + { + "color": "red", + "value": 900000 + } + ] + }, + "unit": "ms" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 13 + }, + "id": 8, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "max by (pod, listener) (fint_consumer_kafka_runtime_problem_duration{namespace=~\"$namespace\", pod=~\"$pod\", listener=~\"$listener\"})", + "legendFormat": "{{pod}} / {{listener}}", + "refId": "A" + } + ], + "title": "Runtime Problem Duration", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "bars", + "fillOpacity": 80, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 4, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "normal" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 1 + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 21 + }, + "id": 9, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "desc" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "sum by (listener, reason) (increase(fint_consumer_kafka_runtime_problem_total{namespace=~\"$namespace\", pod=~\"$pod\", listener=~\"$listener\"}[24h]))", + "legendFormat": "{{listener}} / {{reason}}", + "refId": "A" + } + ], + "title": "Runtime Problem Events Last 24h", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "bars", + "fillOpacity": 80, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 4, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 1 + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 6, + "x": 12, + "y": 21 + }, + "id": 10, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "desc" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "sum by (listener) (increase(fint_consumer_kafka_bootstrap_completed_total{namespace=~\"$namespace\", pod=~\"$pod\", listener=~\"$listener\"}[24h]))", + "legendFormat": "{{listener}}", + "refId": "A" + } + ], + "title": "Bootstrap Completed Last 24h", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "bars", + "fillOpacity": 80, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 4, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 1 + } + ] + }, + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 6, + "x": 18, + "y": 21 + }, + "id": 11, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "desc" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "(sum by (listener) (increase(fint_consumer_kafka_bootstrap_duration_seconds_sum{namespace=~\"$namespace\", pod=~\"$pod\", listener=~\"$listener\"}[24h])))/(clamp_min(sum by (listener) (increase(fint_consumer_kafka_bootstrap_duration_seconds_count{namespace=~\"$namespace\", pod=~\"$pod\", listener=~\"$listener\"}[24h])), 1))", + "legendFormat": "{{listener}}", + "refId": "A" + } + ], + "title": "Average Bootstrap Duration Last 24h", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "bars", + "fillOpacity": 80, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 4, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 1 + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 24, + "x": 0, + "y": 29 + }, + "id": 12, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "desc" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "sum by (listener) (increase(fint_consumer_kafka_bootstrap_end_offset_lookup_failures_total{namespace=~\"$namespace\", pod=~\"$pod\", listener=~\"$listener\"}[24h]))", + "legendFormat": "{{listener}}", + "refId": "A" + } + ], + "title": "Bootstrap End Offset Lookup Failures Last 24h", + "type": "timeseries" + } + ], + "refresh": "30s", + "schemaVersion": 39, + "style": "dark", + "tags": [ + "fint", + "kafka", + "health" + ], + "templating": { + "list": [ + { + "current": { + "selected": true, + "text": "Prometheus", + "value": "Prometheus" + }, + "hide": 0, + "includeAll": false, + "label": "Datasource", + "name": "datasource", + "options": [], + "query": "prometheus", + "refresh": 1, + "regex": "", + "type": "datasource" + }, + { + "allValue": ".*", + "current": { + "selected": true, + "text": "All", + "value": ".*" + }, + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "definition": "label_values(fint_consumer_kafka_runtime_unhealthy, namespace)", + "hide": 0, + "includeAll": true, + "label": "Namespace", + "multi": false, + "name": "namespace", + "options": [], + "query": { + "qryType": 1, + "query": "label_values(fint_consumer_kafka_runtime_unhealthy, namespace)", + "refId": "Prometheus-namespace" + }, + "refresh": 1, + "regex": "", + "type": "query" + }, + { + "allValue": ".*", + "current": { + "selected": true, + "text": "All", + "value": ".*" + }, + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "definition": "label_values(fint_consumer_kafka_runtime_unhealthy{namespace=~\"$namespace\"}, pod)", + "hide": 0, + "includeAll": true, + "label": "Pod", + "multi": true, + "name": "pod", + "options": [], + "query": { + "qryType": 1, + "query": "label_values(fint_consumer_kafka_runtime_unhealthy{namespace=~\"$namespace\"}, pod)", + "refId": "Prometheus-pod" + }, + "refresh": 1, + "regex": "", + "type": "query" + }, + { + "allValue": ".*", + "current": { + "selected": true, + "text": "All", + "value": ".*" + }, + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "definition": "label_values(fint_consumer_kafka_runtime_unhealthy{namespace=~\"$namespace\", pod=~\"$pod\"}, listener)", + "hide": 0, + "includeAll": true, + "label": "Listener", + "multi": true, + "name": "listener", + "options": [], + "query": { + "qryType": 1, + "query": "label_values(fint_consumer_kafka_runtime_unhealthy{namespace=~\"$namespace\", pod=~\"$pod\"}, listener)", + "refId": "Prometheus-listener" + }, + "refresh": 1, + "regex": "", + "type": "query" + } + ] + }, + "time": { + "from": "now-24h", + "to": "now" + }, + "timepicker": {}, + "timezone": "", + "title": "Fint Core Consumer Kafka Health", + "uid": "fint-core-consumer-kafka-health", + "version": 1, + "weekStart": "" +} diff --git a/docs/kafka-health-checks.md b/docs/kafka-health-checks.md index a0379a8f..a213abc9 100644 --- a/docs/kafka-health-checks.md +++ b/docs/kafka-health-checks.md @@ -35,9 +35,11 @@ Readiness skal beskytte trafikk mot en pod som ennå ikke har bygd opp lokal cac Ved oppstart settes readiness til `REFUSING_TRAFFIC`. -To listeners er definert som blokkerende for initial bootstrap: +Følgende listeners er definert som blokkerende for initial bootstrap: - `entity` +- `request-fint-event` +- `event-response` - `relation-update` For hver assigned partition hentes "startup end offset" fra Kafka i det assignment skjer. Deretter spores prosesserte offsets mens records behandles. @@ -62,8 +64,7 @@ Dette er bevisst. Etter at poden er sluppet i trafikk, skal vanlig Kafka-lag ikk Readiness blir `OUT_OF_SERVICE` hvis minst ett av disse forholdene gjelder under oppstart: -- `entity` har ikke konsumert alle sine assigned partitions opp til startup-end-offset. -- `relation-update` har ikke konsumert alle sine assigned partitions opp til startup-end-offset. +- minst én blokkerende listener har ikke konsumert alle sine assigned partitions opp til startup-end-offset. - Kafka end-offset kan ikke hentes for assigned partitions. ### Hva får ikke readiness til å feile @@ -182,6 +183,9 @@ management: I tillegg til actuator-health eksponerer applikasjonen nå Micrometer-metrikker for Kafka-bootstrap og Kafka-runtime-health. Disse er nyttige fordi health-endepunktene bare viser nåværende status, mens metrikker gjør det mulig å følge utvikling over tid i Prometheus og Grafana. +Et eksempel-dashboard for Grafana ligger i [docs/grafana/fint-core-consumer-kafka-health-dashboard.json](/Users/janovekongshaug/Repositories/fint-core-consumer/docs/grafana/fint-core-consumer-kafka-health-dashboard.json). +Dashboardet antar standard Prometheus/Kubernetes-labels som `namespace` og `pod`. Hvis scrape-labels hos dere heter noe annet, må variablene i dashboardet justeres tilsvarende. + ### Bootstrap-metrikker - `fint.consumer.kafka.bootstrap.state` From 8376115467930ecf6092781376e205f0165111f4 Mon Sep 17 00:00:00 2001 From: Jan Ove Kongshaug Date: Tue, 21 Apr 2026 09:53:13 +0200 Subject: [PATCH 08/10] Fix path --- docs/kafka-health-checks.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/kafka-health-checks.md b/docs/kafka-health-checks.md index a213abc9..ed3a693c 100644 --- a/docs/kafka-health-checks.md +++ b/docs/kafka-health-checks.md @@ -183,7 +183,7 @@ management: I tillegg til actuator-health eksponerer applikasjonen nå Micrometer-metrikker for Kafka-bootstrap og Kafka-runtime-health. Disse er nyttige fordi health-endepunktene bare viser nåværende status, mens metrikker gjør det mulig å følge utvikling over tid i Prometheus og Grafana. -Et eksempel-dashboard for Grafana ligger i [docs/grafana/fint-core-consumer-kafka-health-dashboard.json](/Users/janovekongshaug/Repositories/fint-core-consumer/docs/grafana/fint-core-consumer-kafka-health-dashboard.json). +Et eksempel-dashboard for Grafana ligger i [docs/grafana/fint-core-consumer-kafka-health-dashboard.json](/docs/grafana/fint-core-consumer-kafka-health-dashboard.json). Dashboardet antar standard Prometheus/Kubernetes-labels som `namespace` og `pod`. Hvis scrape-labels hos dere heter noe annet, må variablene i dashboardet justeres tilsvarende. ### Bootstrap-metrikker From 67599de3fa27c11cd233a9c9162507eb08687f39 Mon Sep 17 00:00:00 2001 From: Jan Ove Kongshaug Date: Tue, 28 Apr 2026 09:57:11 +0200 Subject: [PATCH 09/10] fix: move bootstrap end-offset lookup off the Kafka rebalance callback AdminClient.listOffsets(...) was called synchronously inside onPartitionsAssigned with a 10s timeout. When the call timed out during pod startup, TimeoutException (a checked exception, not a RuntimeException) escaped the catch block, surfaced as "User rebalance callback throws an error", and DefaultErrorHandler killed the listener container. The tracker now records assignments in-memory and a single-thread ScheduledExecutorService refreshes pending end-offsets in the background with retry on any Exception. Records processed before the offset arrives are buffered so caughtUp evaluates correctly once it lands. Refresh interval and shutdown timeout are exposed via KafkaHealthProperties. --- .../health/BootstrapPartitionStatus.kt | 2 +- .../health/InitialKafkaBootstrapTracker.kt | 148 +++++++++++++++--- .../consumer/health/KafkaHealthProperties.kt | 2 + .../InitialKafkaBootstrapTrackerTest.kt | 90 ++++++++++- 4 files changed, 216 insertions(+), 26 deletions(-) diff --git a/src/main/java/no/fintlabs/consumer/health/BootstrapPartitionStatus.kt b/src/main/java/no/fintlabs/consumer/health/BootstrapPartitionStatus.kt index c283e012..dfd0f69e 100644 --- a/src/main/java/no/fintlabs/consumer/health/BootstrapPartitionStatus.kt +++ b/src/main/java/no/fintlabs/consumer/health/BootstrapPartitionStatus.kt @@ -2,7 +2,7 @@ package no.fintlabs.consumer.health data class BootstrapPartitionStatus( val partition: String, - val endOffset: Long, + val endOffset: Long?, val processedOffset: Long?, val caughtUp: Boolean, ) diff --git a/src/main/java/no/fintlabs/consumer/health/InitialKafkaBootstrapTracker.kt b/src/main/java/no/fintlabs/consumer/health/InitialKafkaBootstrapTracker.kt index 2756b390..0ebb326b 100644 --- a/src/main/java/no/fintlabs/consumer/health/InitialKafkaBootstrapTracker.kt +++ b/src/main/java/no/fintlabs/consumer/health/InitialKafkaBootstrapTracker.kt @@ -1,5 +1,7 @@ package no.fintlabs.consumer.health +import jakarta.annotation.PostConstruct +import jakarta.annotation.PreDestroy import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.TopicPartition import org.slf4j.LoggerFactory @@ -8,6 +10,10 @@ import org.springframework.boot.availability.ReadinessState import org.springframework.context.ApplicationContext import org.springframework.stereotype.Service import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.Executors +import java.util.concurrent.RejectedExecutionException +import java.util.concurrent.ScheduledExecutorService +import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicReference import kotlin.math.max @@ -17,15 +23,43 @@ class InitialKafkaBootstrapTracker( private val endOffsetProvider: EndOffsetProvider, private val applicationContext: ApplicationContext, private val kafkaHealthMetrics: KafkaHealthMetrics, + private val kafkaHealthProperties: KafkaHealthProperties, ) { private val readinessPublished = AtomicReference(null) private val bootstrapCompleted = AtomicBoolean(false) private val blockingListeners = ConcurrentHashMap() + private val executorRef = AtomicReference(null) init { publishReadiness(false) } + @PostConstruct + fun startEndOffsetRefresh() { + val executor = + Executors.newSingleThreadScheduledExecutor { runnable -> + Thread(runnable, "kafka-bootstrap-end-offsets").apply { isDaemon = true } + } + executorRef.set(executor) + val intervalMs = kafkaHealthProperties.bootstrapEndOffsetRefreshInterval.toMillis().coerceAtLeast(1L) + executor.scheduleWithFixedDelay(::tickRefresh, intervalMs, intervalMs, TimeUnit.MILLISECONDS) + } + + @PreDestroy + fun stopEndOffsetRefresh() { + val executor = executorRef.getAndSet(null) ?: return + executor.shutdown() + try { + val timeoutMs = kafkaHealthProperties.bootstrapEndOffsetExecutorShutdownTimeout.toMillis() + if (!executor.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) { + executor.shutdownNow() + } + } catch (_: InterruptedException) { + executor.shutdownNow() + Thread.currentThread().interrupt() + } + } + fun registerBlockingListener(listenerId: String) { blockingListeners.computeIfAbsent(listenerId) { ListenerBootstrapState() } kafkaHealthMetrics.registerBootstrapListener(listenerId) @@ -40,29 +74,17 @@ class InitialKafkaBootstrapTracker( } val listenerState = blockingListeners[listenerId] ?: return - val endOffsets = - try { - endOffsetProvider.latestOffsets(assignments) - } catch (exception: RuntimeException) { - logger.error( - "Failed to fetch end offsets for listener={} assignments={}", - listenerId, - assignments, - exception, - ) - kafkaHealthMetrics.recordBootstrapEndOffsetLookupFailure(listenerId) - publishReadiness(false) - return - } - listenerState.assignmentSeen.set(true) assignments.forEach { topicPartition -> - listenerState.partitions[topicPartition] = PartitionBootstrapState(endOffsets[topicPartition] ?: 0L) + listenerState.partitions.computeIfAbsent(topicPartition) { + PartitionBootstrapState(endOffset = null) + } } + listenerState.assignmentSeen.set(true) - maybeCompleteListener(listenerId, listenerState) - maybeCompleteBootstrap() kafkaHealthMetrics.updateBootstrapPendingPartitions(listenerId, pendingPartitions(listenerState)) + + triggerImmediateRefresh() } fun onPartitionsRevoked( @@ -91,7 +113,7 @@ class InitialKafkaBootstrapTracker( val listenerState = blockingListeners[listenerId] ?: return val topicPartition = TopicPartition(record.topic(), record.partition()) listenerState.partitions.computeIfPresent(topicPartition) { _, state -> - state.withOffset(record.offset()) + state.withProcessedOffset(record.offset()) } maybeCompleteListener(listenerId, listenerState) maybeCompleteBootstrap() @@ -127,6 +149,85 @@ class InitialKafkaBootstrapTracker( ) } + internal fun refreshPendingEndOffsets() { + if (bootstrapCompleted.get()) { + return + } + + val pendingByListener = + blockingListeners + .mapValues { (_, state) -> + state.partitions + .entries + .asSequence() + .filter { it.value.endOffset == null } + .map { it.key } + .toSet() + }.filterValues { it.isNotEmpty() } + + if (pendingByListener.isEmpty()) { + return + } + + val allPartitions = pendingByListener.values.flatten().toSet() + val results = + try { + endOffsetProvider.latestOffsets(allPartitions) + } catch (exception: Exception) { + logger.warn( + "End-offset lookup failed for {} partitions across {} listeners; will retry", + allPartitions.size, + pendingByListener.size, + exception, + ) + pendingByListener.keys.forEach(kafkaHealthMetrics::recordBootstrapEndOffsetLookupFailure) + return + } + + var anyApplied = false + pendingByListener.forEach { (listenerId, partitions) -> + val listenerState = blockingListeners[listenerId] ?: return@forEach + var listenerUpdated = false + partitions.forEach { topicPartition -> + val offset = results[topicPartition] ?: return@forEach + listenerState.partitions.computeIfPresent(topicPartition) { _, existing -> + if (existing.endOffset == null) { + listenerUpdated = true + existing.copy(endOffset = offset) + } else { + existing + } + } + } + if (listenerUpdated) { + anyApplied = true + maybeCompleteListener(listenerId, listenerState) + kafkaHealthMetrics.updateBootstrapPendingPartitions(listenerId, pendingPartitions(listenerState)) + } + } + + if (anyApplied) { + maybeCompleteBootstrap() + } + } + + private fun tickRefresh() { + try { + refreshPendingEndOffsets() + } catch (exception: Exception) { + logger.error("Unexpected error during end-offset refresh tick", exception) + } + } + + private fun triggerImmediateRefresh() { + val executor = executorRef.get() ?: return + try { + executor.execute(::tickRefresh) + } catch (_: RejectedExecutionException) { + // Executor is shutting down — the next scheduled tick (if any) will pick this up. + } + } + private fun maybeCompleteListener( listenerId: String, listenerState: ListenerBootstrapState, @@ -181,13 +282,16 @@ private class ListenerBootstrapState { } private data class PartitionBootstrapState( - val endOffset: Long, + val endOffset: Long?, val processedOffset: Long? = null, ) { val caughtUp: Boolean - get() = endOffset == 0L || ((processedOffset ?: -1L) + 1) >= endOffset + get() { + val end = endOffset ?: return false + return end == 0L || ((processedOffset ?: -1L) + 1) >= end + } - fun withOffset(offset: Long): PartitionBootstrapState { + fun withProcessedOffset(offset: Long): PartitionBootstrapState { return copy(processedOffset = processedOffset?.let { max(it, offset) } ?: offset) } } diff --git a/src/main/java/no/fintlabs/consumer/health/KafkaHealthProperties.kt b/src/main/java/no/fintlabs/consumer/health/KafkaHealthProperties.kt index 9729bb40..3a4de291 100644 --- a/src/main/java/no/fintlabs/consumer/health/KafkaHealthProperties.kt +++ b/src/main/java/no/fintlabs/consumer/health/KafkaHealthProperties.kt @@ -9,4 +9,6 @@ data class KafkaHealthProperties( val runtimeGracePeriod: Duration = Duration.ofMinutes(15), val monitorIntervalSeconds: Int = 30, val noPollThreshold: Float = 3.0f, + val bootstrapEndOffsetRefreshInterval: Duration = Duration.ofSeconds(2), + val bootstrapEndOffsetExecutorShutdownTimeout: Duration = Duration.ofSeconds(5), ) diff --git a/src/test/java/no/fintlabs/consumer/health/InitialKafkaBootstrapTrackerTest.kt b/src/test/java/no/fintlabs/consumer/health/InitialKafkaBootstrapTrackerTest.kt index 28d0b3b8..c757df6c 100644 --- a/src/test/java/no/fintlabs/consumer/health/InitialKafkaBootstrapTrackerTest.kt +++ b/src/test/java/no/fintlabs/consumer/health/InitialKafkaBootstrapTrackerTest.kt @@ -3,6 +3,7 @@ package no.fintlabs.consumer.health import io.micrometer.core.instrument.simple.SimpleMeterRegistry import io.mockk.every import io.mockk.mockk +import io.mockk.verify import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.TopicPartition import org.junit.jupiter.api.Assertions.assertEquals @@ -13,6 +14,7 @@ import org.junit.jupiter.api.Test import org.springframework.context.ApplicationContext import java.time.Duration import java.time.Instant +import java.util.concurrent.TimeoutException class InitialKafkaBootstrapTrackerTest { private val endOffsetProvider: EndOffsetProvider = mockk() @@ -25,7 +27,13 @@ class InitialKafkaBootstrapTrackerTest { @BeforeEach fun setUp() { - tracker = InitialKafkaBootstrapTracker(endOffsetProvider, applicationContext, kafkaHealthMetrics) + tracker = + InitialKafkaBootstrapTracker( + endOffsetProvider = endOffsetProvider, + applicationContext = applicationContext, + kafkaHealthMetrics = kafkaHealthMetrics, + kafkaHealthProperties = KafkaHealthProperties(), + ) tracker.registerBlockingListener(KafkaListenerIds.ENTITY) } @@ -38,6 +46,7 @@ class InitialKafkaBootstrapTrackerTest { mapOf(partition0 to 2L, partition1 to 1L) tracker.onPartitionsAssigned(KafkaListenerIds.ENTITY, setOf(partition0, partition1)) + tracker.refreshPendingEndOffsets() assertFalse(tracker.snapshot().ready) @@ -59,6 +68,7 @@ class InitialKafkaBootstrapTrackerTest { mapOf(partition0 to 0L) tracker.onPartitionsAssigned(KafkaListenerIds.ENTITY, setOf(partition0)) + tracker.refreshPendingEndOffsets() assertTrue(tracker.snapshot().ready) } @@ -72,11 +82,13 @@ class InitialKafkaBootstrapTrackerTest { every { endOffsetProvider.latestOffsets(setOf(partition1)) } returns mapOf(partition1 to 2L) tracker.onPartitionsAssigned(KafkaListenerIds.ENTITY, setOf(partition0)) + tracker.refreshPendingEndOffsets() tracker.onRecordProcessed(KafkaListenerIds.ENTITY, ConsumerRecord("topic", 0, 0L, "key", "value")) assertTrue(tracker.snapshot().ready) tracker.onPartitionsAssigned(KafkaListenerIds.ENTITY, setOf(partition1)) + tracker.refreshPendingEndOffsets() assertTrue(tracker.snapshot().ready) } @@ -92,7 +104,9 @@ class InitialKafkaBootstrapTrackerTest { every { endOffsetProvider.latestOffsets(setOf(relationPartition)) } returns mapOf(relationPartition to 1L) tracker.onPartitionsAssigned(KafkaListenerIds.ENTITY, setOf(entityPartition)) + tracker.refreshPendingEndOffsets() tracker.onPartitionsAssigned(KafkaListenerIds.RELATION_UPDATE, setOf(relationPartition)) + tracker.refreshPendingEndOffsets() tracker.onRecordProcessed(KafkaListenerIds.ENTITY, ConsumerRecord("entity-topic", 0, 0L, "key", "value")) assertFalse(tracker.snapshot().ready) @@ -114,6 +128,7 @@ class InitialKafkaBootstrapTrackerTest { mapOf(partition0 to 2L, partition1 to 1L) tracker.onPartitionsAssigned(KafkaListenerIds.ENTITY, setOf(partition0, partition1)) + tracker.refreshPendingEndOffsets() assertEquals( 2.0, @@ -172,13 +187,16 @@ class InitialKafkaBootstrapTrackerTest { } @Test - fun `should count end offset lookup failures`() { + fun `should count end offset lookup failures and recover on retry`() { val partition0 = TopicPartition("topic", 0) - every { endOffsetProvider.latestOffsets(setOf(partition0)) } throws RuntimeException("boom") + every { endOffsetProvider.latestOffsets(setOf(partition0)) } throws + RuntimeException("boom") andThen mapOf(partition0 to 1L) tracker.onPartitionsAssigned(KafkaListenerIds.ENTITY, setOf(partition0)) + tracker.refreshPendingEndOffsets() + assertFalse(tracker.snapshot().ready) assertEquals( 1.0, meterRegistry @@ -187,5 +205,71 @@ class InitialKafkaBootstrapTrackerTest { .counter() .count(), ) + + tracker.refreshPendingEndOffsets() + tracker.onRecordProcessed(KafkaListenerIds.ENTITY, ConsumerRecord("topic", 0, 0L, "key", "value")) + + assertTrue(tracker.snapshot().ready) + } + + @Test + fun `should keep consumer alive when end offset lookup throws checked exception`() { + val partition0 = TopicPartition("topic", 0) + + every { endOffsetProvider.latestOffsets(setOf(partition0)) } throws + TimeoutException("kafka admin timeout") andThen mapOf(partition0 to 0L) + + tracker.onPartitionsAssigned(KafkaListenerIds.ENTITY, setOf(partition0)) + tracker.refreshPendingEndOffsets() + + assertFalse(tracker.snapshot().ready) + assertEquals( + 1.0, + meterRegistry + .get("fint.consumer.kafka.bootstrap.end_offset.lookup.failures") + .tag("listener", KafkaListenerIds.ENTITY) + .counter() + .count(), + ) + + tracker.refreshPendingEndOffsets() + assertTrue(tracker.snapshot().ready) + } + + @Test + fun `should preserve records processed before end offset arrives`() { + val partition0 = TopicPartition("topic", 0) + + every { endOffsetProvider.latestOffsets(setOf(partition0)) } returns mapOf(partition0 to 6L) + + tracker.onPartitionsAssigned(KafkaListenerIds.ENTITY, setOf(partition0)) + tracker.onRecordProcessed(KafkaListenerIds.ENTITY, ConsumerRecord("topic", 0, 5L, "key", "value")) + + assertFalse(tracker.snapshot().ready) + + tracker.refreshPendingEndOffsets() + + assertTrue(tracker.snapshot().ready) + } + + @Test + fun `should skip end offset lookup for partitions revoked before refresh`() { + val partition0 = TopicPartition("topic", 0) + val partition1 = TopicPartition("topic", 1) + + every { endOffsetProvider.latestOffsets(setOf(partition1)) } returns mapOf(partition1 to 0L) + + tracker.onPartitionsAssigned(KafkaListenerIds.ENTITY, setOf(partition0, partition1)) + tracker.onPartitionsRevoked(KafkaListenerIds.ENTITY, listOf(partition0)) + tracker.refreshPendingEndOffsets() + + assertTrue(tracker.snapshot().ready) + verify { endOffsetProvider.latestOffsets(setOf(partition1)) } + } + + @Test + fun `should not call end offset provider when no partitions are pending`() { + tracker.refreshPendingEndOffsets() + verify(exactly = 0) { endOffsetProvider.latestOffsets(any()) } } } From d63c6d3e94005d44a6e07cefc198ff370c73f657 Mon Sep 17 00:00:00 2001 From: Jan Ove Kongshaug Date: Wed, 29 Apr 2026 08:27:36 +0200 Subject: [PATCH 10/10] fix: reuse library-provided AdminClient for end-offset lookups KafkaAdminEndOffsetProvider built its own AdminClient via KafkaProperties.buildAdminProperties(null), which produced a different effective config than the consumer pipeline (no SslBundles resolution, missing the securityProps map that no.novari.kafka.KafkaConfiguration populates). In prod this manifested as every listOffsets call hanging the full 10s and timing out, even though consumers on the same broker connected fine. Inject the AdminClient bean from the library so admin and consumer share one configuration and one lifecycle. --- .../no/fintlabs/consumer/health/EndOffsetProvider.kt | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/src/main/java/no/fintlabs/consumer/health/EndOffsetProvider.kt b/src/main/java/no/fintlabs/consumer/health/EndOffsetProvider.kt index 88268de1..c6ed1135 100644 --- a/src/main/java/no/fintlabs/consumer/health/EndOffsetProvider.kt +++ b/src/main/java/no/fintlabs/consumer/health/EndOffsetProvider.kt @@ -1,10 +1,8 @@ package no.fintlabs.consumer.health -import jakarta.annotation.PreDestroy import org.apache.kafka.clients.admin.AdminClient import org.apache.kafka.clients.admin.OffsetSpec import org.apache.kafka.common.TopicPartition -import org.springframework.boot.autoconfigure.kafka.KafkaProperties import org.springframework.stereotype.Component import java.time.Duration import java.util.concurrent.TimeUnit @@ -15,10 +13,8 @@ interface EndOffsetProvider { @Component class KafkaAdminEndOffsetProvider( - kafkaProperties: KafkaProperties, + private val adminClient: AdminClient, ) : EndOffsetProvider { - private val adminClient = AdminClient.create(kafkaProperties.buildAdminProperties(null)) - override fun latestOffsets(partitions: Set): Map { if (partitions.isEmpty()) { return emptyMap() @@ -31,11 +27,6 @@ class KafkaAdminEndOffsetProvider( .mapValues { (_, result) -> result.offset() } } - @PreDestroy - fun close() { - adminClient.close(TIMEOUT) - } - companion object { private val TIMEOUT = Duration.ofSeconds(10) }