diff --git a/docs/grafana/fint-core-consumer-kafka-health-dashboard.json b/docs/grafana/fint-core-consumer-kafka-health-dashboard.json new file mode 100644 index 00000000..375c8047 --- /dev/null +++ b/docs/grafana/fint-core-consumer-kafka-health-dashboard.json @@ -0,0 +1,1150 @@ +{ + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": { + "type": "grafana", + "uid": "-- Grafana --" + }, + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 1, + "id": null, + "links": [], + "liveNow": false, + "panels": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 1 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 5, + "w": 6, + "x": 0, + "y": 0 + }, + "id": 1, + "options": { + "colorMode": "value", + "graphMode": "none", + "justifyMode": "center", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "textMode": "auto" + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "sum(max by (pod, listener) (fint_consumer_kafka_runtime_unhealthy{namespace=~\"$namespace\", pod=~\"$pod\", listener=~\"$listener\"}))", + "instant": true, + "legendFormat": "", + "refId": "A" + } + ], + "title": "Unhealthy Listeners Now", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "yellow", + "value": 1 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 5, + "w": 6, + "x": 6, + "y": 0 + }, + "id": 2, + "options": { + "colorMode": "value", + "graphMode": "none", + "justifyMode": "center", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "textMode": "auto" + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "sum(max by (pod, listener) (fint_consumer_kafka_bootstrap_state{namespace=~\"$namespace\", pod=~\"$pod\", listener=~\"$listener\"}))", + "instant": true, + "legendFormat": "", + "refId": "A" + } + ], + "title": "Listeners Bootstrapping Now", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "yellow", + "value": 1 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 5, + "w": 6, + "x": 12, + "y": 0 + }, + "id": 3, + "options": { + "colorMode": "value", + "graphMode": "none", + "justifyMode": "center", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "textMode": "auto" + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "sum(max by (pod, listener) (fint_consumer_kafka_bootstrap_partitions_pending{namespace=~\"$namespace\", pod=~\"$pod\", listener=~\"$listener\"}))", + "instant": true, + "legendFormat": "", + "refId": "A" + } + ], + "title": "Pending Bootstrap Partitions Now", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "decimals": 0, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 1 + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 5, + "w": 6, + "x": 18, + "y": 0 + }, + "id": 4, + "options": { + "colorMode": "value", + "graphMode": "none", + "justifyMode": "center", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "sum" + ], + "fields": "", + "values": false + }, + "textMode": "auto" + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "sum(increase(fint_consumer_kafka_runtime_problem_total{namespace=~\"$namespace\", pod=~\"$pod\", listener=~\"$listener\"}[24h]))", + "instant": true, + "legendFormat": "", + "refId": "A" + } + ], + "title": "Runtime Problems Last 24h", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 15, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 2, + "pointSize": 4, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "yellow", + "value": 1 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 5 + }, + "id": 5, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "max by (pod, listener) (fint_consumer_kafka_bootstrap_state{namespace=~\"$namespace\", pod=~\"$pod\", listener=~\"$listener\"})", + "legendFormat": "{{pod}} / {{listener}}", + "refId": "A" + } + ], + "title": "Bootstrap State", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 15, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 2, + "pointSize": 4, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "yellow", + "value": 1 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 5 + }, + "id": 6, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "max by (pod, listener) (fint_consumer_kafka_bootstrap_partitions_pending{namespace=~\"$namespace\", pod=~\"$pod\", listener=~\"$listener\"})", + "legendFormat": "{{pod}} / {{listener}}", + "refId": "A" + } + ], + "title": "Pending Bootstrap Partitions", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 15, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 2, + "pointSize": 4, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 1 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 13 + }, + "id": 7, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "max by (pod, listener) (fint_consumer_kafka_runtime_unhealthy{namespace=~\"$namespace\", pod=~\"$pod\", listener=~\"$listener\"})", + "legendFormat": "{{pod}} / {{listener}}", + "refId": "A" + } + ], + "title": "Runtime Unhealthy", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 15, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 2, + "pointSize": 4, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "yellow", + "value": 60000 + }, + { + "color": "red", + "value": 900000 + } + ] + }, + "unit": "ms" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 13 + }, + "id": 8, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "max by (pod, listener) (fint_consumer_kafka_runtime_problem_duration{namespace=~\"$namespace\", pod=~\"$pod\", listener=~\"$listener\"})", + "legendFormat": "{{pod}} / {{listener}}", + "refId": "A" + } + ], + "title": "Runtime Problem Duration", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "bars", + "fillOpacity": 80, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 4, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "normal" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 1 + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 21 + }, + "id": 9, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "desc" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "sum by (listener, reason) (increase(fint_consumer_kafka_runtime_problem_total{namespace=~\"$namespace\", pod=~\"$pod\", listener=~\"$listener\"}[24h]))", + "legendFormat": "{{listener}} / {{reason}}", + "refId": "A" + } + ], + "title": "Runtime Problem Events Last 24h", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "bars", + "fillOpacity": 80, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 4, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 1 + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 6, + "x": 12, + "y": 21 + }, + "id": 10, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "desc" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "sum by (listener) (increase(fint_consumer_kafka_bootstrap_completed_total{namespace=~\"$namespace\", pod=~\"$pod\", listener=~\"$listener\"}[24h]))", + "legendFormat": "{{listener}}", + "refId": "A" + } + ], + "title": "Bootstrap Completed Last 24h", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "bars", + "fillOpacity": 80, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 4, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 1 + } + ] + }, + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 6, + "x": 18, + "y": 21 + }, + "id": 11, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "desc" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "(sum by (listener) (increase(fint_consumer_kafka_bootstrap_duration_seconds_sum{namespace=~\"$namespace\", pod=~\"$pod\", listener=~\"$listener\"}[24h])))/(clamp_min(sum by (listener) (increase(fint_consumer_kafka_bootstrap_duration_seconds_count{namespace=~\"$namespace\", pod=~\"$pod\", listener=~\"$listener\"}[24h])), 1))", + "legendFormat": "{{listener}}", + "refId": "A" + } + ], + "title": "Average Bootstrap Duration Last 24h", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "bars", + "fillOpacity": 80, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 4, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 1 + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 24, + "x": 0, + "y": 29 + }, + "id": 12, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "desc" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "sum by (listener) (increase(fint_consumer_kafka_bootstrap_end_offset_lookup_failures_total{namespace=~\"$namespace\", pod=~\"$pod\", listener=~\"$listener\"}[24h]))", + "legendFormat": "{{listener}}", + "refId": "A" + } + ], + "title": "Bootstrap End Offset Lookup Failures Last 24h", + "type": "timeseries" + } + ], + "refresh": "30s", + "schemaVersion": 39, + "style": "dark", + "tags": [ + "fint", + "kafka", + "health" + ], + "templating": { + "list": [ + { + "current": { + "selected": true, + "text": "Prometheus", + "value": "Prometheus" + }, + "hide": 0, + "includeAll": false, + "label": "Datasource", + "name": "datasource", + "options": [], + "query": "prometheus", + "refresh": 1, + "regex": "", + "type": "datasource" + }, + { + "allValue": ".*", + "current": { + "selected": true, + "text": "All", + "value": ".*" + }, + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "definition": "label_values(fint_consumer_kafka_runtime_unhealthy, namespace)", + "hide": 0, + "includeAll": true, + "label": "Namespace", + "multi": false, + "name": "namespace", + "options": [], + "query": { + "qryType": 1, + "query": "label_values(fint_consumer_kafka_runtime_unhealthy, namespace)", + "refId": "Prometheus-namespace" + }, + "refresh": 1, + "regex": "", + "type": "query" + }, + { + "allValue": ".*", + "current": { + "selected": true, + "text": "All", + "value": ".*" + }, + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "definition": "label_values(fint_consumer_kafka_runtime_unhealthy{namespace=~\"$namespace\"}, pod)", + "hide": 0, + "includeAll": true, + "label": "Pod", + "multi": true, + "name": "pod", + "options": [], + "query": { + "qryType": 1, + "query": "label_values(fint_consumer_kafka_runtime_unhealthy{namespace=~\"$namespace\"}, pod)", + "refId": "Prometheus-pod" + }, + "refresh": 1, + "regex": "", + "type": "query" + }, + { + "allValue": ".*", + "current": { + "selected": true, + "text": "All", + "value": ".*" + }, + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "definition": "label_values(fint_consumer_kafka_runtime_unhealthy{namespace=~\"$namespace\", pod=~\"$pod\"}, listener)", + "hide": 0, + "includeAll": true, + "label": "Listener", + "multi": true, + "name": "listener", + "options": [], + "query": { + "qryType": 1, + "query": "label_values(fint_consumer_kafka_runtime_unhealthy{namespace=~\"$namespace\", pod=~\"$pod\"}, listener)", + "refId": "Prometheus-listener" + }, + "refresh": 1, + "regex": "", + "type": "query" + } + ] + }, + "time": { + "from": "now-24h", + "to": "now" + }, + "timepicker": {}, + "timezone": "", + "title": "Fint Core Consumer Kafka Health", + "uid": "fint-core-consumer-kafka-health", + "version": 1, + "weekStart": "" +} diff --git a/docs/kafka-health-checks.md b/docs/kafka-health-checks.md new file mode 100644 index 00000000..ed3a693c --- /dev/null +++ b/docs/kafka-health-checks.md @@ -0,0 +1,272 @@ +# Kafka Health Checks + +Dette dokumentet beskriver hvordan health-checkene i `fint-core-consumer` fungerer etter innføringen av Kafka-basert readiness og liveness. + +## Oversikt + +Applikasjonen bruker tre forskjellige typer health/probes i Kubernetes: + +- `startupProbe`: brukes bare helt tidlig for å verifisere at JVM og Spring Boot faktisk starter. +- `readinessProbe`: brukes for å avgjøre om poden trygt kan motta trafikk. +- `livenessProbe`: brukes for å avgjøre om poden fortsatt lever, eller om Kubernetes skal restarte den. + +Disse probe-typene har forskjellig ansvar og skal ikke blandes: + +- `startupProbe` skal ikke vite noe om hvor langt Kafka-consumerne har kommet i bootstrap. +- `readinessProbe` skal blokkere trafikk til initial bootstrap er ferdig. +- `livenessProbe` skal ikke feile bare fordi applikasjonen ligger litt etter i konsumering; den skal feile hvis Kafka-consumerne i praksis har sluttet å fungere over tid. + +## Hvordan Consumer bruker dem + +Consumer eksponerer actuator-endepunktene: + +- `/actuator/health/readiness` +- `/actuator/health/liveness` + +Readiness er koblet til en initial Kafka-bootstrap-tracker. Liveness er koblet til en separat Kafka-runtime-monitor. + +## Readiness + +### Hensikt + +Readiness skal beskytte trafikk mot en pod som ennå ikke har bygd opp lokal cache ved oppstart. + +### Hvordan den virker + +Ved oppstart settes readiness til `REFUSING_TRAFFIC`. + +Følgende listeners er definert som blokkerende for initial bootstrap: + +- `entity` +- `request-fint-event` +- `event-response` +- `relation-update` + +For hver assigned partition hentes "startup end offset" fra Kafka i det assignment skjer. Deretter spores prosesserte offsets mens records behandles. + +Listeneren regnes som ferdig når alle dens assigned partitions har konsumert seg opp til offseten som gjaldt ved oppstartstidspunktet. + +Applikasjonen regnes som `ready` når alle blokkerende listeners er ferdige. + +### Viktig nyanse + +Dette er en `initial-only` readiness. + +Det betyr: + +- Readiness blokkerer ved oppstart. +- Readiness går til `UP` når initial bootstrap er ferdig. +- Readiness går ikke ned igjen senere bare fordi det kommer flere meldinger, full sync, eller midlertidig lag. + +Dette er bevisst. Etter at poden er sluppet i trafikk, skal vanlig Kafka-lag ikke stoppe lesetrafikk. + +### Hva får readiness til å feile + +Readiness blir `OUT_OF_SERVICE` hvis minst ett av disse forholdene gjelder under oppstart: + +- minst én blokkerende listener har ikke konsumert alle sine assigned partitions opp til startup-end-offset. +- Kafka end-offset kan ikke hentes for assigned partitions. + +### Hva får ikke readiness til å feile + +Følgende forhold feiler ikke readiness etter at bootstrap er ferdig: + +- En ny full sync kommer inn og skaper lag. +- Det produseres mange meldinger mens appen kjører. +- Consumeren ligger midlertidig etter på topicet. +- Topicet er stille i lang tid. + +## Liveness + +### Hensikt + +Liveness skal oppdage at Kafka-consumerne i praksis har sluttet å fungere, og gi Kubernetes grunnlag for å restarte poden. + +### Hvordan den virker + +Liveness monitorerer runtime-status for registrerte Kafka-listeners. + +Den ser ikke på vanlig lag eller antall uprosesserte meldinger. I stedet ser den på Kafka-runtime-signaler: + +- `ConsumerStartedEvent` +- `ListenerContainerIdleEvent` +- `NonResponsiveConsumerEvent` +- `ConsumerFailedToStartEvent` +- `ConsumerStoppedEvent` + +Det brukes en grace-periode, default `15m`, for å unngå falske positive ved korte forstyrrelser. + +### Hva som holder liveness frisk + +Liveness holdes `UP` hvis en listener viser tegn til normal drift, for eksempel: + +- appen prosesserer records +- listeneren sender idle-events fordi topicet er stille +- consumer-containeren starter normalt + +Det betyr at stille topics ikke i seg selv gjør poden unhealthy. + +### Hva får liveness til å feile + +Liveness blir `DOWN` hvis en registrert listener har en runtime-feil som varer lenger enn konfigurert grace-periode. + +Eksempler: + +- `NonResponsiveConsumerEvent` og tilstanden varer lenger enn grace-perioden +- `ConsumerFailedToStartEvent` +- `ConsumerStoppedEvent` med annen grunn enn `NORMAL` + +Typiske scenarioer dette er ment å fange: + +- nettverksbrudd mellom pod og Kafka +- Kafka svarer ikke over tid +- autentiseringsfeil mot Kafka +- consumer-container stopper på grunn av feil + +### Hva får ikke liveness til å feile + +Følgende forhold skal ikke alene feile liveness: + +- vanlig Kafka-lag +- full sync som gjør at consumeren henger litt etter +- ingen nye meldinger på topicet i flere timer +- normal rebalance mellom pods + +En vanlig rebalance håndteres av partition assignment/revocation, ikke som en fatal liveness-feil. + +## Startup Probe + +### Hensikt + +`startupProbe` bør bare brukes som en enkel oppstartssperre mens JVM og Spring Boot kommer opp. + +Den bør ikke inneholde Kafka-bootstrap-logikk. Grunnen er at `startupProbe` bare brukes i den tidlige oppstartsfasen, mens readiness kan uttrykke "ikke klar enda" på en mer presis måte. + +### Anbefaling + +Bruk `startupProbe` mot en enkel actuator-health, mens readiness og liveness peker mot de dedikerte probe-endepunktene. + +## Konfigurasjon i Consumer + +Default konfigurasjon i applikasjonen: + +```yaml +fint: + consumer: + health: + kafka: + idle-event-interval: 1m + runtime-grace-period: 15m + monitor-interval-seconds: 30 + no-poll-threshold: 3.0 + +management: + endpoint: + health: + probes: + enabled: true + group: + readiness: + include: readinessState,initialKafkaBootstrap + liveness: + include: livenessState,kafkaRuntime +``` + +### Hva disse Kafka-innstillingene betyr + +- `idle-event-interval`: hvor ofte idle-event sendes mens et topic er stille. +- `runtime-grace-period`: hvor lenge runtime-feil kan vare før liveness går ned. +- `monitor-interval-seconds`: hvor ofte Spring Kafka sjekker consumerens poll-aktivitet. +- `no-poll-threshold`: terskel for når manglende poll anses som "non-responsive". + +## Metrikker + +I tillegg til actuator-health eksponerer applikasjonen nå Micrometer-metrikker for Kafka-bootstrap og Kafka-runtime-health. Disse er nyttige fordi health-endepunktene bare viser nåværende status, mens metrikker gjør det mulig å følge utvikling over tid i Prometheus og Grafana. + +Et eksempel-dashboard for Grafana ligger i [docs/grafana/fint-core-consumer-kafka-health-dashboard.json](/docs/grafana/fint-core-consumer-kafka-health-dashboard.json). +Dashboardet antar standard Prometheus/Kubernetes-labels som `namespace` og `pod`. Hvis scrape-labels hos dere heter noe annet, må variablene i dashboardet justeres tilsvarende. + +### Bootstrap-metrikker + +- `fint.consumer.kafka.bootstrap.state` + Gauge per listener. `1` betyr at initial bootstrap fortsatt pågår, `0` betyr at den er ferdig. + +- `fint.consumer.kafka.bootstrap.partitions.pending` + Gauge per listener. Antall assigned partitions som ennå ikke har konsumert seg opp til startup-end-offset. + +- `fint.consumer.kafka.bootstrap.completed` + Counter per listener, og også for `listener=all`. Incrementes når bootstrap fullføres. + +- `fint.consumer.kafka.bootstrap.duration` + Timer per listener, og også for `listener=all`. Måler hvor lang tid initial bootstrap faktisk tok. + +- `fint.consumer.kafka.bootstrap.end_offset.lookup.failures` + Counter per listener. Incrementes når applikasjonen ikke klarer å hente startup end offset fra Kafka. + +### Runtime-metrikker + +- `fint.consumer.kafka.runtime.problem` + Counter med tags `listener` og `reason`. Incrementes når runtime-monitoren ser et problem, for eksempel `NON_RESPONSIVE` eller `STOPPED_AUTH`. + +- `fint.consumer.kafka.runtime.unhealthy` + Gauge per listener. `1` betyr at listeneren har vært i problemtilstand lenger enn grace-perioden og dermed gjør liveness `DOWN`. + +- `fint.consumer.kafka.runtime.problem.duration` + Gauge per listener. Viser hvor lenge den nåværende problemtilstanden har vart, i millisekunder. + +### Viktige tags + +Metrikkene er bevisst tagget lavt-kardinalt: + +- `listener` +- `reason` + +Det brukes ikke tags som Kafka-key, partition eller corrId, for å unngå høy kardinalitet og unødvendig støy i metrics-backend. + +## Eksempel i Kubernetes + +Eksempel på probe-oppsett: + +```yaml +startupProbe: + httpGet: + path: /utdanning/vurdering/actuator/health + port: 8080 + periodSeconds: 5 + timeoutSeconds: 3 + failureThreshold: 60 + +readinessProbe: + httpGet: + path: /utdanning/vurdering/actuator/health/readiness + port: 8080 + periodSeconds: 5 + timeoutSeconds: 3 + failureThreshold: 3 + +livenessProbe: + httpGet: + path: /utdanning/vurdering/actuator/health/liveness + port: 8080 + periodSeconds: 30 + timeoutSeconds: 3 + failureThreshold: 3 +``` + +## Praktiske konsekvenser + +Med dette oppsettet blir flyten typisk slik: + +1. Poden starter. +2. `startupProbe` verifiserer at appen faktisk kommer opp. +3. `readinessProbe` holder poden ute av trafikk mens `entity` og `relation-update` bygger initial cache. +4. Når initial bootstrap er ferdig, blir poden `Ready`. +5. Senere full sync eller vanlig Kafka-lag påvirker ikke readiness. +6. Hvis Kafka-consumerne blir ikke-responsive eller stopper over tid, blir `liveness` `DOWN` og Kubernetes kan restarte poden. + +## Oppsummering + +- `startupProbe` beskytter bare oppstart av prosessen. +- `readinessProbe` beskytter trafikk under initial Kafka-bootstrap. +- `livenessProbe` beskytter mot vedvarende Kafka-runtime-feil. +- Vanlig lag eller stille topics skal ikke gjøre poden unhealthy. diff --git a/src/integrationTest/java/no/fintlabs/consumer/integration/LegacyResourceTopicIT.kt b/src/integrationTest/java/no/fintlabs/consumer/integration/LegacyResourceTopicIT.kt deleted file mode 100644 index 8b35abb3..00000000 --- a/src/integrationTest/java/no/fintlabs/consumer/integration/LegacyResourceTopicIT.kt +++ /dev/null @@ -1,138 +0,0 @@ -package no.fintlabs.consumer.integration - -import com.fasterxml.jackson.databind.ObjectMapper -import no.fintlabs.Application -import no.fintlabs.adapter.models.sync.SyncType -import no.fintlabs.cache.CacheService -import no.fintlabs.utils.EntityProducer -import no.novari.fint.model.felles.kompleksedatatyper.Identifikator -import no.novari.fint.model.resource.Link -import no.novari.fint.model.resource.utdanning.timeplan.FagResource -import org.awaitility.kotlin.await -import org.junit.jupiter.api.AfterEach -import org.junit.jupiter.api.Test -import org.springframework.beans.factory.annotation.Autowired -import org.springframework.boot.test.context.SpringBootTest -import org.springframework.boot.test.web.server.LocalServerPort -import org.springframework.kafka.test.context.EmbeddedKafka -import org.springframework.test.annotation.DirtiesContext -import org.springframework.test.context.TestPropertySource -import org.springframework.test.web.reactive.server.WebTestClient -import java.time.Clock -import java.time.Duration -import java.util.UUID -import java.util.concurrent.TimeUnit -import kotlin.test.assertEquals - -@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = [Application::class]) -@EmbeddedKafka(partitions = 1) -@TestPropertySource( - properties = [ - "spring.kafka.bootstrap-servers=\${spring.embedded.kafka.brokers}", - "spring.kafka.consumer.auto-offset-reset=earliest", - "spring.kafka.consumer.group-id=legacy-resource-topic-it", - "novari.kafka.default-replicas=1", - "fint.relation.base-url=https://foo.org", - "fint.org-id=foo.org", - "fint.consumer.domain=utdanning", - "fint.consumer.package=timeplan", - "fint.consumer.kafka.consume-legacy-resource-topics=true", - "fint.security.enabled=false", - ], -) -@DirtiesContext -class LegacyResourceTopicIT { - @LocalServerPort - private var port: Int = 0 - - private val client by lazy { - WebTestClient - .bindToServer() - .baseUrl("http://localhost:$port/utdanning/timeplan") - .responseTimeout(Duration.ofSeconds(10)) - .build() - } - - @Autowired - lateinit var objectMapper: ObjectMapper - - @Autowired - lateinit var cacheService: CacheService - - @Autowired - lateinit var entityProducer: EntityProducer - - private val clock: Clock = Clock.systemUTC() - - @AfterEach - fun tearDown() { - cacheService.getCache("fag").evictExpired(Long.MAX_VALUE) - } - - @Test - fun `resource published to legacy topic with resource-name header is cached`() { - entityProducer - .publishToLegacyResourceTopic( - resourceName = "fag", - resource = createFag("1", "Fag 1"), - resourceId = "1", - syncType = SyncType.FULL, - syncCorrId = UUID.randomUUID().toString(), - syncTotalSize = 1, - timestamp = clock.millis(), - includeResourceNameHeader = true, - ).get(10, TimeUnit.SECONDS) - - await.atMost(Duration.ofSeconds(30)).untilAsserted { - val resources = fetchAllFagResources() - assertEquals(1, resources.size) - assertEquals("Fag 1", resources.first().navn) - } - } - - @Test - fun `resource published to legacy topic without resource-name header falls back to topic name and is cached`() { - entityProducer - .publishToLegacyResourceTopic( - resourceName = "fag", - resource = createFag("2", "Fag 2"), - resourceId = "2", - syncType = SyncType.FULL, - syncCorrId = UUID.randomUUID().toString(), - syncTotalSize = 1, - timestamp = clock.millis(), - includeResourceNameHeader = false, - ).get(10, TimeUnit.SECONDS) - - await.atMost(Duration.ofSeconds(30)).untilAsserted { - val resources = fetchAllFagResources() - assertEquals(1, resources.size) - assertEquals("Fag 2", resources.first().navn) - } - } - - private fun createFag( - id: String, - navn: String, - ): FagResource { - val fag = FagResource() - fag.systemId = Identifikator().apply { identifikatorverdi = "systemid-fag-$id" } - fag.navn = navn - fag.links["self"] = listOf(Link("https://foo.org/utdanning/timeplan/fag/systemid/$id")) - return fag - } - - private fun fetchAllFagResources(): List { - val page = - client - .get() - .uri("/fag") - .exchange() - .expectStatus() - .isOk - .expectBody(FintResourcesPage::class.java) - .returnResult() - .responseBody ?: return emptyList() - return page.getResources(objectMapper, FagResource::class.java) - } -} diff --git a/src/main/java/no/fintlabs/autorelation/kafka/AutoRelationEntityConsumer.kt b/src/main/java/no/fintlabs/autorelation/kafka/AutoRelationEntityConsumer.kt index c43e0ad4..213527b2 100644 --- a/src/main/java/no/fintlabs/autorelation/kafka/AutoRelationEntityConsumer.kt +++ b/src/main/java/no/fintlabs/autorelation/kafka/AutoRelationEntityConsumer.kt @@ -2,6 +2,9 @@ package no.fintlabs.autorelation.kafka import no.fintlabs.autorelation.RelationEventService import no.fintlabs.consumer.config.ConsumerConfiguration +import no.fintlabs.consumer.health.KafkaListenerContainerHealthConfigurer +import no.fintlabs.consumer.health.KafkaListenerIds +import no.fintlabs.consumer.health.KafkaRuntimeHealthMonitor import no.fintlabs.consumer.kafka.KafkaConstants.RESOURCE_NAME import no.fintlabs.consumer.kafka.KafkaConsumerErrorHandling import no.fintlabs.consumer.kafka.applyConsumerFetchSettings @@ -24,13 +27,15 @@ import org.springframework.kafka.listener.ConcurrentMessageListenerContainer class AutoRelationEntityConsumer( private val consumerConfig: ConsumerConfiguration, private val relationEventService: RelationEventService, + private val kafkaRuntimeHealthMonitor: KafkaRuntimeHealthMonitor, + private val kafkaListenerContainerHealthConfigurer: KafkaListenerContainerHealthConfigurer, ) { companion object { private val logger = LoggerFactory.getLogger(AutoRelationEntityConsumer::class.java) private const val CONSUMER_NAME = "autorelation-entity" } - @Bean + @Bean(name = [KafkaListenerIds.AUTORELATION_ENTITY]) @ConditionalOnProperty( name = ["fint.consumer.autorelation.enabled"], havingValue = "true", @@ -39,8 +44,10 @@ class AutoRelationEntityConsumer( fun autorelationEntityConsumerContainer( parameterizedListenerContainerFactoryService: ParameterizedListenerContainerFactoryService, errorHandlerFactory: ErrorHandlerFactory, - ): ConcurrentMessageListenerContainer = - parameterizedListenerContainerFactoryService + ): ConcurrentMessageListenerContainer { + kafkaRuntimeHealthMonitor.registerListener(KafkaListenerIds.AUTORELATION_ENTITY) + + return parameterizedListenerContainerFactoryService .createRecordListenerContainerFactory( Any::class.java, this::consumeRecord, @@ -66,6 +73,7 @@ class AutoRelationEntityConsumer( container.concurrency = consumerConfig.kafka.entityConcurrency container.containerProperties.idleBetweenPolls = consumerConfig.kafka.idleBetweenPolls container.applyConsumerFetchSettings(consumerConfig.kafka) + kafkaListenerContainerHealthConfigurer.customize(container) container.applyStartupJitter(consumerConfig.kafka) }, ).createContainer( @@ -80,8 +88,9 @@ class AutoRelationEntityConsumer( ).resourceName("${consumerConfig.domain}-${consumerConfig.packageName}") .build(), ) + } - fun consumeRecord(consumerRecord: ConsumerRecord) { + fun consumeRecord(consumerRecord: ConsumerRecord) = consumerRecord .value() ?.let { resource -> @@ -90,8 +99,7 @@ class AutoRelationEntityConsumer( consumerRecord.extractIdentifier(), resource, ) - } - } + }.also { kafkaRuntimeHealthMonitor.onRecordProcessed(KafkaListenerIds.AUTORELATION_ENTITY) } private fun ConsumerRecord.getResourceName(): String = headers().stringValue(RESOURCE_NAME) ?: throw IllegalArgumentException("Resource name header not found") diff --git a/src/main/java/no/fintlabs/autorelation/kafka/RelationUpdateConsumer.kt b/src/main/java/no/fintlabs/autorelation/kafka/RelationUpdateConsumer.kt index ad75d53f..5a9c4161 100644 --- a/src/main/java/no/fintlabs/autorelation/kafka/RelationUpdateConsumer.kt +++ b/src/main/java/no/fintlabs/autorelation/kafka/RelationUpdateConsumer.kt @@ -3,6 +3,10 @@ package no.fintlabs.autorelation.kafka import no.fintlabs.autorelation.AutoRelationService import no.fintlabs.autorelation.model.RelationUpdate import no.fintlabs.consumer.config.ConsumerConfiguration +import no.fintlabs.consumer.health.InitialKafkaBootstrapTracker +import no.fintlabs.consumer.health.KafkaListenerContainerHealthConfigurer +import no.fintlabs.consumer.health.KafkaListenerIds +import no.fintlabs.consumer.health.KafkaRuntimeHealthMonitor import no.fintlabs.consumer.kafka.KafkaConsumerErrorHandling import no.fintlabs.consumer.kafka.KafkaThroughputMetrics import no.fintlabs.consumer.kafka.applyConsumerFetchSettings @@ -25,13 +29,16 @@ class RelationUpdateConsumer( private val autoRelationService: AutoRelationService, private val consumerConfig: ConsumerConfiguration, private val kafkaThroughputMetrics: KafkaThroughputMetrics, + private val initialKafkaBootstrapTracker: InitialKafkaBootstrapTracker, + private val kafkaRuntimeHealthMonitor: KafkaRuntimeHealthMonitor, + private val kafkaListenerContainerHealthConfigurer: KafkaListenerContainerHealthConfigurer, ) { companion object { private val logger = LoggerFactory.getLogger(RelationUpdateConsumer::class.java) private const val CONSUMER_NAME = "relation-update" } - @Bean + @Bean(name = [KafkaListenerIds.RELATION_UPDATE]) @ConditionalOnProperty( name = ["fint.consumer.autorelation.enabled"], havingValue = "true", @@ -40,8 +47,11 @@ class RelationUpdateConsumer( fun relationUpdateConsumerContainer( parameterizedListenerContainerFactoryService: ParameterizedListenerContainerFactoryService, errorHandlerFactory: ErrorHandlerFactory, - ): ConcurrentMessageListenerContainer = - parameterizedListenerContainerFactoryService + ): ConcurrentMessageListenerContainer { + initialKafkaBootstrapTracker.registerBlockingListener(KafkaListenerIds.RELATION_UPDATE) + kafkaRuntimeHealthMonitor.registerListener(KafkaListenerIds.RELATION_UPDATE) + + return parameterizedListenerContainerFactoryService .createRecordListenerContainerFactory( RelationUpdate::class.java, this::consumeRecord, @@ -50,8 +60,14 @@ class RelationUpdateConsumer( .groupIdApplicationDefaultWithUniqueSuffix() .maxPollRecordsKafkaDefault() .maxPollIntervalKafkaDefault() - .seekToBeginningOnAssignment() - .build(), + .seekToBeginningAndPerformOperationOnAssignment { assignments -> + initialKafkaBootstrapTracker.onPartitionsAssigned( + KafkaListenerIds.RELATION_UPDATE, + assignments.keys, + ) + }.onRevocation { partitions -> + initialKafkaBootstrapTracker.onPartitionsRevoked(KafkaListenerIds.RELATION_UPDATE, partitions) + }.build(), errorHandlerFactory.createErrorHandler( KafkaConsumerErrorHandling.createLoggingErrorHandlerConfiguration( logger, @@ -62,6 +78,7 @@ class RelationUpdateConsumer( container.concurrency = consumerConfig.kafka.relationConcurrency container.containerProperties.idleBetweenPolls = consumerConfig.kafka.idleBetweenPolls container.applyConsumerFetchSettings(consumerConfig.kafka) + kafkaListenerContainerHealthConfigurer.customize(container) container.applyStartupJitter(consumerConfig.kafka) }, ).createContainer( @@ -73,13 +90,13 @@ class RelationUpdateConsumer( .orgId(TopicNamePatternParameterPattern.exactly(consumerConfig.orgId.asTopicSegment)) .domainContextApplicationDefault() .build(), - // Makes sure we listen to component patterns such as utdanning-vurdering'-relation-update' ).resource( TopicNamePatternParameterPattern.exactly( "${consumerConfig.domain}-${consumerConfig.packageName}-relation-update", ), ).build(), ) + } fun consumeRecord(consumerRecord: ConsumerRecord) { val startedAt = System.nanoTime() @@ -89,5 +106,6 @@ class RelationUpdateConsumer( relationUpdate.targetEntity.resourceName, System.nanoTime() - startedAt, ) + kafkaRuntimeHealthMonitor.onRecordProcessed(KafkaListenerIds.RELATION_UPDATE) } } diff --git a/src/main/java/no/fintlabs/consumer/config/ConsumerConfiguration.kt b/src/main/java/no/fintlabs/consumer/config/ConsumerConfiguration.kt index 88fbc38d..e84ae74b 100644 --- a/src/main/java/no/fintlabs/consumer/config/ConsumerConfiguration.kt +++ b/src/main/java/no/fintlabs/consumer/config/ConsumerConfiguration.kt @@ -46,7 +46,6 @@ data class ConsumerConfiguration( // TODO: Cleanup configuration data class KafkaConfiguration( // Entity consumption in EntityConsumer & AutoRelationEntityConsumer - val consumeLegacyResourceTopics: Boolean = false, val entityConcurrency: Int = 1, val relationEntitySeekToBeginning: Boolean = false, val fetchMinBytes: Int = 65536, diff --git a/src/main/java/no/fintlabs/consumer/health/BootstrapPartitionStatus.kt b/src/main/java/no/fintlabs/consumer/health/BootstrapPartitionStatus.kt new file mode 100644 index 00000000..dfd0f69e --- /dev/null +++ b/src/main/java/no/fintlabs/consumer/health/BootstrapPartitionStatus.kt @@ -0,0 +1,8 @@ +package no.fintlabs.consumer.health + +data class BootstrapPartitionStatus( + val partition: String, + val endOffset: Long?, + val processedOffset: Long?, + val caughtUp: Boolean, +) diff --git a/src/main/java/no/fintlabs/consumer/health/BootstrapReadinessSnapshot.kt b/src/main/java/no/fintlabs/consumer/health/BootstrapReadinessSnapshot.kt new file mode 100644 index 00000000..1b352953 --- /dev/null +++ b/src/main/java/no/fintlabs/consumer/health/BootstrapReadinessSnapshot.kt @@ -0,0 +1,6 @@ +package no.fintlabs.consumer.health + +data class BootstrapReadinessSnapshot( + val ready: Boolean, + val blockingListeners: List, +) diff --git a/src/main/java/no/fintlabs/consumer/health/EndOffsetProvider.kt b/src/main/java/no/fintlabs/consumer/health/EndOffsetProvider.kt new file mode 100644 index 00000000..c6ed1135 --- /dev/null +++ b/src/main/java/no/fintlabs/consumer/health/EndOffsetProvider.kt @@ -0,0 +1,33 @@ +package no.fintlabs.consumer.health + +import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.clients.admin.OffsetSpec +import org.apache.kafka.common.TopicPartition +import org.springframework.stereotype.Component +import java.time.Duration +import java.util.concurrent.TimeUnit + +interface EndOffsetProvider { + fun latestOffsets(partitions: Set): Map +} + +@Component +class KafkaAdminEndOffsetProvider( + private val adminClient: AdminClient, +) : EndOffsetProvider { + override fun latestOffsets(partitions: Set): Map { + if (partitions.isEmpty()) { + return emptyMap() + } + + return adminClient + .listOffsets(partitions.associateWith { OffsetSpec.latest() }) + .all() + .get(TIMEOUT.toSeconds(), TimeUnit.SECONDS) + .mapValues { (_, result) -> result.offset() } + } + + companion object { + private val TIMEOUT = Duration.ofSeconds(10) + } +} diff --git a/src/main/java/no/fintlabs/consumer/health/InitialKafkaBootstrapHealthIndicator.kt b/src/main/java/no/fintlabs/consumer/health/InitialKafkaBootstrapHealthIndicator.kt new file mode 100644 index 00000000..fbe89c6c --- /dev/null +++ b/src/main/java/no/fintlabs/consumer/health/InitialKafkaBootstrapHealthIndicator.kt @@ -0,0 +1,31 @@ +package no.fintlabs.consumer.health + +import org.springframework.boot.actuate.health.Health +import org.springframework.boot.actuate.health.HealthIndicator +import org.springframework.stereotype.Component + +@Component("initialKafkaBootstrap") +class InitialKafkaBootstrapHealthIndicator( + private val initialKafkaBootstrapTracker: InitialKafkaBootstrapTracker, +) : HealthIndicator { + override fun health(): Health { + val snapshot = initialKafkaBootstrapTracker.snapshot() + val builder = if (snapshot.ready) Health.up() else Health.outOfService() + + return builder + .withDetail("ready", snapshot.ready) + .withDetail("blockingListeners", snapshot.blockingListeners.size) + .withDetail( + "listeners", + snapshot.blockingListeners.associate { listener -> + listener.listenerId to + mapOf( + "assignmentSeen" to listener.assignmentSeen, + "completed" to listener.completed, + "assignedPartitions" to listener.assignedPartitions, + "caughtUpPartitions" to listener.caughtUpPartitions, + ) + }, + ).build() + } +} diff --git a/src/main/java/no/fintlabs/consumer/health/InitialKafkaBootstrapTracker.kt b/src/main/java/no/fintlabs/consumer/health/InitialKafkaBootstrapTracker.kt new file mode 100644 index 00000000..0ebb326b --- /dev/null +++ b/src/main/java/no/fintlabs/consumer/health/InitialKafkaBootstrapTracker.kt @@ -0,0 +1,297 @@ +package no.fintlabs.consumer.health + +import jakarta.annotation.PostConstruct +import jakarta.annotation.PreDestroy +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.TopicPartition +import org.slf4j.LoggerFactory +import org.springframework.boot.availability.AvailabilityChangeEvent +import org.springframework.boot.availability.ReadinessState +import org.springframework.context.ApplicationContext +import org.springframework.stereotype.Service +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.Executors +import java.util.concurrent.RejectedExecutionException +import java.util.concurrent.ScheduledExecutorService +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicReference +import kotlin.math.max + +@Service +class InitialKafkaBootstrapTracker( + private val endOffsetProvider: EndOffsetProvider, + private val applicationContext: ApplicationContext, + private val kafkaHealthMetrics: KafkaHealthMetrics, + private val kafkaHealthProperties: KafkaHealthProperties, +) { + private val readinessPublished = AtomicReference(null) + private val bootstrapCompleted = AtomicBoolean(false) + private val blockingListeners = ConcurrentHashMap() + private val executorRef = AtomicReference(null) + + init { + publishReadiness(false) + } + + @PostConstruct + fun startEndOffsetRefresh() { + val executor = + Executors.newSingleThreadScheduledExecutor { runnable -> + Thread(runnable, "kafka-bootstrap-end-offsets").apply { isDaemon = true } + } + executorRef.set(executor) + val intervalMs = kafkaHealthProperties.bootstrapEndOffsetRefreshInterval.toMillis().coerceAtLeast(1L) + executor.scheduleWithFixedDelay(::tickRefresh, intervalMs, intervalMs, TimeUnit.MILLISECONDS) + } + + @PreDestroy + fun stopEndOffsetRefresh() { + val executor = executorRef.getAndSet(null) ?: return + executor.shutdown() + try { + val timeoutMs = kafkaHealthProperties.bootstrapEndOffsetExecutorShutdownTimeout.toMillis() + if (!executor.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) { + executor.shutdownNow() + } + } catch (_: InterruptedException) { + executor.shutdownNow() + Thread.currentThread().interrupt() + } + } + + fun registerBlockingListener(listenerId: String) { + blockingListeners.computeIfAbsent(listenerId) { ListenerBootstrapState() } + kafkaHealthMetrics.registerBootstrapListener(listenerId) + } + + fun onPartitionsAssigned( + listenerId: String, + assignments: Set, + ) { + if (bootstrapCompleted.get() || assignments.isEmpty()) { + return + } + + val listenerState = blockingListeners[listenerId] ?: return + + assignments.forEach { topicPartition -> + listenerState.partitions.computeIfAbsent(topicPartition) { + PartitionBootstrapState(endOffset = null) + } + } + listenerState.assignmentSeen.set(true) + + kafkaHealthMetrics.updateBootstrapPendingPartitions(listenerId, pendingPartitions(listenerState)) + + triggerImmediateRefresh() + } + + fun onPartitionsRevoked( + listenerId: String, + partitions: Collection, + ) { + if (bootstrapCompleted.get()) { + return + } + + val listenerState = blockingListeners[listenerId] ?: return + partitions.forEach(listenerState.partitions::remove) + maybeCompleteListener(listenerId, listenerState) + maybeCompleteBootstrap() + kafkaHealthMetrics.updateBootstrapPendingPartitions(listenerId, pendingPartitions(listenerState)) + } + + fun onRecordProcessed( + listenerId: String, + record: ConsumerRecord<*, *>, + ) { + if (bootstrapCompleted.get()) { + return + } + + val listenerState = blockingListeners[listenerId] ?: return + val topicPartition = TopicPartition(record.topic(), record.partition()) + listenerState.partitions.computeIfPresent(topicPartition) { _, state -> + state.withProcessedOffset(record.offset()) + } + maybeCompleteListener(listenerId, listenerState) + maybeCompleteBootstrap() + kafkaHealthMetrics.updateBootstrapPendingPartitions(listenerId, pendingPartitions(listenerState)) + } + + fun snapshot(): BootstrapReadinessSnapshot { + val listenerStatuses = + blockingListeners.toSortedMap().map { (listenerId, listenerState) -> + ListenerBootstrapStatus( + listenerId = listenerId, + assignmentSeen = listenerState.assignmentSeen.get(), + completed = listenerState.completed.get(), + assignedPartitions = listenerState.partitions.size, + caughtUpPartitions = listenerState.partitions.values.count(PartitionBootstrapState::caughtUp), + partitions = + listenerState.partitions + .toSortedMap(compareBy({ it.topic() }, { it.partition() })) + .map { (topicPartition, partitionState) -> + BootstrapPartitionStatus( + partition = "${topicPartition.topic()}-${topicPartition.partition()}", + endOffset = partitionState.endOffset, + processedOffset = partitionState.processedOffset, + caughtUp = partitionState.caughtUp, + ) + }, + ) + } + + return BootstrapReadinessSnapshot( + ready = bootstrapCompleted.get(), + blockingListeners = listenerStatuses, + ) + } + + internal fun refreshPendingEndOffsets() { + if (bootstrapCompleted.get()) { + return + } + + val pendingByListener = + blockingListeners + .mapValues { (_, state) -> + state.partitions + .entries + .asSequence() + .filter { it.value.endOffset == null } + .map { it.key } + .toSet() + }.filterValues { it.isNotEmpty() } + + if (pendingByListener.isEmpty()) { + return + } + + val allPartitions = pendingByListener.values.flatten().toSet() + val results = + try { + endOffsetProvider.latestOffsets(allPartitions) + } catch (exception: Exception) { + logger.warn( + "End-offset lookup failed for {} partitions across {} listeners; will retry", + allPartitions.size, + pendingByListener.size, + exception, + ) + pendingByListener.keys.forEach(kafkaHealthMetrics::recordBootstrapEndOffsetLookupFailure) + return + } + + var anyApplied = false + pendingByListener.forEach { (listenerId, partitions) -> + val listenerState = blockingListeners[listenerId] ?: return@forEach + var listenerUpdated = false + partitions.forEach { topicPartition -> + val offset = results[topicPartition] ?: return@forEach + listenerState.partitions.computeIfPresent(topicPartition) { _, existing -> + if (existing.endOffset == null) { + listenerUpdated = true + existing.copy(endOffset = offset) + } else { + existing + } + } + } + if (listenerUpdated) { + anyApplied = true + maybeCompleteListener(listenerId, listenerState) + kafkaHealthMetrics.updateBootstrapPendingPartitions(listenerId, pendingPartitions(listenerState)) + } + } + + if (anyApplied) { + maybeCompleteBootstrap() + } + } + + private fun tickRefresh() { + try { + refreshPendingEndOffsets() + } catch (exception: Exception) { + logger.error("Unexpected error during end-offset refresh tick", exception) + } + } + + private fun triggerImmediateRefresh() { + val executor = executorRef.get() ?: return + try { + executor.execute(::tickRefresh) + } catch (_: RejectedExecutionException) { + // Executor is shutting down — the next scheduled tick (if any) will pick this up. + } + } + + private fun maybeCompleteListener( + listenerId: String, + listenerState: ListenerBootstrapState, + ) { + if ( + !listenerState.completed.get() && + listenerState.assignmentSeen.get() && + listenerState.partitions.values.all(PartitionBootstrapState::caughtUp) + ) { + listenerState.completed.set(true) + kafkaHealthMetrics.markBootstrapCompleted(listenerId) + logger.info("Initial Kafka bootstrap completed for listener={}", listenerId) + } + } + + private fun maybeCompleteBootstrap() { + if ( + !bootstrapCompleted.get() && + blockingListeners.isNotEmpty() && + blockingListeners.values.all { it.completed.get() } + ) { + bootstrapCompleted.set(true) + kafkaHealthMetrics.markBootstrapAllCompleted() + publishReadiness(true) + logger.info("Initial Kafka bootstrap completed for all blocking listeners") + } + } + + private fun pendingPartitions(listenerState: ListenerBootstrapState): Int { + return listenerState.partitions.values.count { !it.caughtUp } + } + + private fun publishReadiness(ready: Boolean) { + val changed = readinessPublished.getAndSet(ready) != ready + if (changed) { + AvailabilityChangeEvent.publish( + applicationContext, + if (ready) ReadinessState.ACCEPTING_TRAFFIC else ReadinessState.REFUSING_TRAFFIC, + ) + } + } + + companion object { + private val logger = LoggerFactory.getLogger(InitialKafkaBootstrapTracker::class.java) + } +} + +private class ListenerBootstrapState { + val completed = AtomicBoolean(false) + val assignmentSeen = AtomicBoolean(false) + val partitions = ConcurrentHashMap() +} + +private data class PartitionBootstrapState( + val endOffset: Long?, + val processedOffset: Long? = null, +) { + val caughtUp: Boolean + get() { + val end = endOffset ?: return false + return end == 0L || ((processedOffset ?: -1L) + 1) >= end + } + + fun withProcessedOffset(offset: Long): PartitionBootstrapState { + return copy(processedOffset = processedOffset?.let { max(it, offset) } ?: offset) + } +} diff --git a/src/main/java/no/fintlabs/consumer/health/KafkaHealthConfiguration.kt b/src/main/java/no/fintlabs/consumer/health/KafkaHealthConfiguration.kt new file mode 100644 index 00000000..d5a1004b --- /dev/null +++ b/src/main/java/no/fintlabs/consumer/health/KafkaHealthConfiguration.kt @@ -0,0 +1,11 @@ +package no.fintlabs.consumer.health + +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import java.time.Clock + +@Configuration +class KafkaHealthConfiguration { + @Bean + fun kafkaHealthClock(): Clock = Clock.systemUTC() +} diff --git a/src/main/java/no/fintlabs/consumer/health/KafkaHealthMetrics.kt b/src/main/java/no/fintlabs/consumer/health/KafkaHealthMetrics.kt new file mode 100644 index 00000000..210dcac9 --- /dev/null +++ b/src/main/java/no/fintlabs/consumer/health/KafkaHealthMetrics.kt @@ -0,0 +1,190 @@ +package no.fintlabs.consumer.health + +import io.micrometer.core.instrument.Counter +import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.core.instrument.Tag +import io.micrometer.core.instrument.Timer +import org.springframework.stereotype.Component +import java.time.Clock +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicLong + +@Component +class KafkaHealthMetrics( + private val meterRegistry: MeterRegistry, + private val clock: Clock, + private val kafkaHealthProperties: KafkaHealthProperties, +) { + private val counters = ConcurrentHashMap() + private val timers = ConcurrentHashMap() + private val bootstrapStates = ConcurrentHashMap() + private val runtimeStates = ConcurrentHashMap() + private val bootstrapAllStartNanos = AtomicLong(System.nanoTime()) + private val bootstrapAllRecorded = AtomicBoolean(false) + + fun registerBootstrapListener(listenerId: String) { + bootstrapStates.computeIfAbsent(listenerId) { + BootstrapMetricState(System.nanoTime()).also { state -> + meterRegistry.gauge( + "fint.consumer.kafka.bootstrap.state", + listOf(Tag.of("listener", listenerId)), + state, + ) { it.inProgress.get().toDouble() } + meterRegistry.gauge( + "fint.consumer.kafka.bootstrap.partitions.pending", + listOf(Tag.of("listener", listenerId)), + state, + ) { it.pendingPartitions.get().toDouble() } + } + } + } + + fun updateBootstrapPendingPartitions( + listenerId: String, + pendingPartitions: Int, + ) { + bootstrapStates[listenerId]?.pendingPartitions?.set(pendingPartitions) + } + + fun markBootstrapCompleted(listenerId: String) { + bootstrapStates[listenerId]?.let { state -> + state.pendingPartitions.set(0) + state.inProgress.set(0) + if (state.completed.compareAndSet(false, true)) { + counter( + "fint.consumer.kafka.bootstrap.completed", + listOf(Tag.of("listener", listenerId)), + ).increment() + timer( + "fint.consumer.kafka.bootstrap.duration", + listOf(Tag.of("listener", listenerId)), + ).record(System.nanoTime() - state.startNanos, TimeUnit.NANOSECONDS) + } + } + } + + fun markBootstrapAllCompleted() { + if (bootstrapAllRecorded.compareAndSet(false, true)) { + counter( + "fint.consumer.kafka.bootstrap.completed", + listOf(Tag.of("listener", "all")), + ).increment() + timer( + "fint.consumer.kafka.bootstrap.duration", + listOf(Tag.of("listener", "all")), + ).record(System.nanoTime() - bootstrapAllStartNanos.get(), TimeUnit.NANOSECONDS) + } + } + + fun recordBootstrapEndOffsetLookupFailure(listenerId: String) { + counter( + "fint.consumer.kafka.bootstrap.end_offset.lookup.failures", + listOf(Tag.of("listener", listenerId)), + ).increment() + } + + fun registerRuntimeListener(listenerId: String) { + runtimeStates.computeIfAbsent(listenerId) { + RuntimeMetricState().also { state -> + meterRegistry.gauge( + "fint.consumer.kafka.runtime.unhealthy", + listOf(Tag.of("listener", listenerId)), + state, + ) { + if (it.isUnhealthy( + clock.millis(), + kafkaHealthProperties.runtimeGracePeriod.toMillis(), + ) + ) { + 1.0 + } else { + 0.0 + } + } + meterRegistry.gauge( + "fint.consumer.kafka.runtime.problem.duration", + listOf(Tag.of("listener", listenerId)), + state, + ) { it.problemDuration(clock.millis()).toDouble() } + } + } + } + + fun markRuntimeHealthy(listenerId: String) { + runtimeStates[listenerId]?.markHealthy(clock.millis()) + } + + fun markRuntimeProblem( + listenerId: String, + reason: String, + ) { + runtimeStates[listenerId]?.markProblem(clock.millis()) + counter( + "fint.consumer.kafka.runtime.problem", + listOf(Tag.of("listener", listenerId), Tag.of("reason", reason)), + ).increment() + } + + private fun counter( + name: String, + tags: List, + ): Counter { + return counters.computeIfAbsent(meterKey(name, tags)) { meterRegistry.counter(name, tags) } + } + + private fun timer( + name: String, + tags: List, + ): Timer { + return timers.computeIfAbsent(meterKey(name, tags)) { meterRegistry.timer(name, tags) } + } + + private fun meterKey( + name: String, + tags: List, + ): String { + return "$name|${tags.joinToString("|") { "${it.key}=${it.value}" }}" + } +} + +private class BootstrapMetricState( + val startNanos: Long, +) { + val pendingPartitions = AtomicInteger(0) + val inProgress = AtomicInteger(1) + val completed = AtomicBoolean(false) +} + +private class RuntimeMetricState { + private val problemSince = AtomicLong(0L) + + fun markHealthy(now: Long) { + problemSince.set(0L) + } + + fun markProblem(now: Long) { + problemSince.compareAndSet(0L, now) + } + + fun problemDuration(now: Long): Long { + return problemSince + .get() + .takeIf { it > 0L } + ?.let { now - it } + ?: 0L + } + + fun isUnhealthy( + now: Long, + gracePeriodMs: Long, + ): Boolean { + return problemSince + .get() + .takeIf { it > 0L } + ?.let { now - it >= gracePeriodMs } + ?: false + } +} diff --git a/src/main/java/no/fintlabs/consumer/health/KafkaHealthProperties.kt b/src/main/java/no/fintlabs/consumer/health/KafkaHealthProperties.kt new file mode 100644 index 00000000..3a4de291 --- /dev/null +++ b/src/main/java/no/fintlabs/consumer/health/KafkaHealthProperties.kt @@ -0,0 +1,14 @@ +package no.fintlabs.consumer.health + +import org.springframework.boot.context.properties.ConfigurationProperties +import java.time.Duration + +@ConfigurationProperties(prefix = "fint.consumer.health.kafka") +data class KafkaHealthProperties( + val idleEventInterval: Duration = Duration.ofMinutes(1), + val runtimeGracePeriod: Duration = Duration.ofMinutes(15), + val monitorIntervalSeconds: Int = 30, + val noPollThreshold: Float = 3.0f, + val bootstrapEndOffsetRefreshInterval: Duration = Duration.ofSeconds(2), + val bootstrapEndOffsetExecutorShutdownTimeout: Duration = Duration.ofSeconds(5), +) diff --git a/src/main/java/no/fintlabs/consumer/health/KafkaListenerContainerHealthConfigurer.kt b/src/main/java/no/fintlabs/consumer/health/KafkaListenerContainerHealthConfigurer.kt new file mode 100644 index 00000000..eba005e7 --- /dev/null +++ b/src/main/java/no/fintlabs/consumer/health/KafkaListenerContainerHealthConfigurer.kt @@ -0,0 +1,15 @@ +package no.fintlabs.consumer.health + +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer +import org.springframework.stereotype.Component + +@Component +class KafkaListenerContainerHealthConfigurer( + private val kafkaHealthProperties: KafkaHealthProperties, +) { + fun customize(container: ConcurrentMessageListenerContainer) { + container.containerProperties.idleEventInterval = kafkaHealthProperties.idleEventInterval.toMillis() + container.containerProperties.monitorInterval = kafkaHealthProperties.monitorIntervalSeconds + container.containerProperties.noPollThreshold = kafkaHealthProperties.noPollThreshold + } +} diff --git a/src/main/java/no/fintlabs/consumer/health/KafkaListenerIds.kt b/src/main/java/no/fintlabs/consumer/health/KafkaListenerIds.kt new file mode 100644 index 00000000..d4e15e80 --- /dev/null +++ b/src/main/java/no/fintlabs/consumer/health/KafkaListenerIds.kt @@ -0,0 +1,9 @@ +package no.fintlabs.consumer.health + +object KafkaListenerIds { + const val ENTITY = "resourceEntityConsumerFactory" + const val REQUEST_EVENT = "requestFintEventRequestListenerContainer" + const val RESPONSE_EVENT = "responseFintEventContainerListener" + const val RELATION_UPDATE = "relationUpdateConsumerContainer" + const val AUTORELATION_ENTITY = "buildAutoRelationConsumer" +} diff --git a/src/main/java/no/fintlabs/consumer/health/KafkaRuntimeHealthMonitor.kt b/src/main/java/no/fintlabs/consumer/health/KafkaRuntimeHealthMonitor.kt new file mode 100644 index 00000000..f94705ed --- /dev/null +++ b/src/main/java/no/fintlabs/consumer/health/KafkaRuntimeHealthMonitor.kt @@ -0,0 +1,161 @@ +package no.fintlabs.consumer.health + +import org.springframework.boot.actuate.health.Health +import org.springframework.boot.actuate.health.HealthIndicator +import org.springframework.context.event.EventListener +import org.springframework.kafka.event.ConsumerFailedToStartEvent +import org.springframework.kafka.event.ConsumerStartedEvent +import org.springframework.kafka.event.ConsumerStoppedEvent +import org.springframework.kafka.event.KafkaEvent +import org.springframework.kafka.event.ListenerContainerIdleEvent +import org.springframework.kafka.event.NonResponsiveConsumerEvent +import org.springframework.kafka.listener.MessageListenerContainer +import org.springframework.stereotype.Component +import java.time.Clock +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.atomic.AtomicReference + +@Component("kafkaRuntime") +class KafkaRuntimeHealthMonitor( + private val kafkaHealthProperties: KafkaHealthProperties, + private val clock: Clock, + private val kafkaHealthMetrics: KafkaHealthMetrics, +) : HealthIndicator { + private val trackedListeners = ConcurrentHashMap.newKeySet() + private val listenerStates = ConcurrentHashMap() + + fun registerListener(listenerId: String) { + trackedListeners.add(listenerId) + listenerStates.computeIfAbsent(listenerId) { ListenerRuntimeState(now()) } + kafkaHealthMetrics.registerRuntimeListener(listenerId) + } + + fun onRecordProcessed(listenerId: String) { + if (!trackedListeners.contains(listenerId)) { + return + } + listenerStates.computeIfAbsent(listenerId) { ListenerRuntimeState(now()) }.markHealthy(now()) + kafkaHealthMetrics.markRuntimeHealthy(listenerId) + } + + @EventListener + fun onConsumerStarted(event: ConsumerStartedEvent) { + listenerIdOf(event)?.takeIf(trackedListeners::contains)?.let { listenerId -> + listenerStates.computeIfAbsent(listenerId) { ListenerRuntimeState(now()) }.markHealthy(now()) + kafkaHealthMetrics.markRuntimeHealthy(listenerId) + } + } + + @EventListener + fun onListenerContainerIdle(event: ListenerContainerIdleEvent) { + event.listenerId.takeIf(trackedListeners::contains)?.let { listenerId -> + listenerStates.computeIfAbsent(listenerId) { ListenerRuntimeState(now()) }.markHealthy(now()) + kafkaHealthMetrics.markRuntimeHealthy(listenerId) + } + } + + @EventListener + fun onNonResponsiveConsumer(event: NonResponsiveConsumerEvent) { + event.listenerId.takeIf(trackedListeners::contains)?.let { listenerId -> + listenerStates + .computeIfAbsent( + listenerId, + ) { ListenerRuntimeState(now()) } + .markProblem("NON_RESPONSIVE", now()) + kafkaHealthMetrics.markRuntimeProblem(listenerId, "NON_RESPONSIVE") + } + } + + @EventListener + fun onConsumerFailedToStart(event: ConsumerFailedToStartEvent) { + listenerIdOf(event)?.takeIf(trackedListeners::contains)?.let { listenerId -> + listenerStates + .computeIfAbsent( + listenerId, + ) { ListenerRuntimeState(now()) } + .markProblem("FAILED_TO_START", now()) + kafkaHealthMetrics.markRuntimeProblem(listenerId, "FAILED_TO_START") + } + } + + @EventListener + fun onConsumerStopped(event: ConsumerStoppedEvent) { + if (event.reason == ConsumerStoppedEvent.Reason.NORMAL) { + return + } + + listenerIdOf(event)?.takeIf(trackedListeners::contains)?.let { listenerId -> + listenerStates + .computeIfAbsent(listenerId) { ListenerRuntimeState(now()) } + .markProblem("STOPPED_${event.reason.name}", now()) + kafkaHealthMetrics.markRuntimeProblem(listenerId, "STOPPED_${event.reason.name}") + } + } + + override fun health(): Health { + val now = now() + val unhealthyListeners = + trackedListeners + .mapNotNull { listenerId -> + listenerStates[listenerId] + ?.takeIf { it.isUnhealthy(now, kafkaHealthProperties.runtimeGracePeriod.toMillis()) } + ?.let { listenerId to it.snapshot(now) } + }.toMap() + + val builder = if (unhealthyListeners.isEmpty()) Health.up() else Health.down() + + return builder + .withDetail("trackedListeners", trackedListeners.size) + .withDetail("runtimeGracePeriodMs", kafkaHealthProperties.runtimeGracePeriod.toMillis()) + .withDetail("unhealthyListeners", unhealthyListeners) + .build() + } + + private fun listenerIdOf(event: KafkaEvent): String? { + return runCatching { + event.getContainer(MessageListenerContainer::class.java).listenerId + }.getOrNull() + } + + private fun now(): Long = clock.millis() +} + +private class ListenerRuntimeState( + initialHealthyAt: Long, +) { + private val lastHealthyAt = AtomicLong(initialHealthyAt) + private val problemSince = AtomicLong(0L) + private val problem = AtomicReference(null) + + fun markHealthy(now: Long) { + lastHealthyAt.set(now) + problemSince.set(0L) + problem.set(null) + } + + fun markProblem( + reason: String, + now: Long, + ) { + problem.compareAndSet(null, reason) + problemSince.compareAndSet(0L, now) + } + + fun isUnhealthy( + now: Long, + gracePeriodMs: Long, + ): Boolean = + problemSince + .get() + .takeIf { it > 0L } + ?.let { now - it >= gracePeriodMs } + ?: false + + fun snapshot(now: Long): Map = + mapOf( + "problem" to problem.get(), + "problemDurationMs" to (now - problemSince.get()), + "lastHealthyAtMs" to lastHealthyAt.get(), + ) +} diff --git a/src/main/java/no/fintlabs/consumer/health/ListenerBootstrapStatus.kt b/src/main/java/no/fintlabs/consumer/health/ListenerBootstrapStatus.kt new file mode 100644 index 00000000..8aff7fa3 --- /dev/null +++ b/src/main/java/no/fintlabs/consumer/health/ListenerBootstrapStatus.kt @@ -0,0 +1,10 @@ +package no.fintlabs.consumer.health + +data class ListenerBootstrapStatus( + val listenerId: String, + val assignmentSeen: Boolean, + val completed: Boolean, + val assignedPartitions: Int, + val caughtUpPartitions: Int, + val partitions: List, +) diff --git a/src/main/java/no/fintlabs/consumer/kafka/entity/EntityConsumer.kt b/src/main/java/no/fintlabs/consumer/kafka/entity/EntityConsumer.kt index aa03d8bc..1cb81ef0 100644 --- a/src/main/java/no/fintlabs/consumer/kafka/entity/EntityConsumer.kt +++ b/src/main/java/no/fintlabs/consumer/kafka/entity/EntityConsumer.kt @@ -1,6 +1,10 @@ package no.fintlabs.consumer.kafka.entity import no.fintlabs.consumer.config.ConsumerConfiguration +import no.fintlabs.consumer.health.InitialKafkaBootstrapTracker +import no.fintlabs.consumer.health.KafkaListenerContainerHealthConfigurer +import no.fintlabs.consumer.health.KafkaListenerIds +import no.fintlabs.consumer.health.KafkaRuntimeHealthMonitor import no.fintlabs.consumer.kafka.KafkaConstants.RESOURCE_NAME import no.fintlabs.consumer.kafka.KafkaConsumerErrorHandling import no.fintlabs.consumer.kafka.applyConsumerFetchSettings @@ -13,7 +17,6 @@ import no.novari.kafka.consuming.ParameterizedListenerContainerFactoryService import no.novari.kafka.topic.name.EntityTopicNamePatternParameters import no.novari.kafka.topic.name.TopicNamePatternParameterPattern import no.novari.kafka.topic.name.TopicNamePatternPrefixParameters -import no.novari.metamodel.MetamodelService import org.apache.kafka.clients.consumer.ConsumerRecord import org.slf4j.LoggerFactory import org.springframework.context.annotation.Bean @@ -25,19 +28,24 @@ class EntityConsumer( private val entityProcessingService: EntityProcessingService, private val consumerConfig: ConsumerConfiguration, private val resourceConverter: ResourceConverter, - private val metamodelService: MetamodelService, + private val initialKafkaBootstrapTracker: InitialKafkaBootstrapTracker, + private val kafkaRuntimeHealthMonitor: KafkaRuntimeHealthMonitor, + private val kafkaListenerContainerHealthConfigurer: KafkaListenerContainerHealthConfigurer, ) { companion object { private val logger = LoggerFactory.getLogger(EntityConsumer::class.java) private const val CONSUMER_NAME = "entity" } - @Bean + @Bean(name = [KafkaListenerIds.ENTITY]) fun resourceEntityConsumerFactory( parameterizedListenerContainerFactoryService: ParameterizedListenerContainerFactoryService, errorHandlerFactory: ErrorHandlerFactory, - ): ConcurrentMessageListenerContainer = - parameterizedListenerContainerFactoryService + ): ConcurrentMessageListenerContainer { + initialKafkaBootstrapTracker.registerBlockingListener(KafkaListenerIds.ENTITY) + kafkaRuntimeHealthMonitor.registerListener(KafkaListenerIds.ENTITY) + + return parameterizedListenerContainerFactoryService .createRecordListenerContainerFactory( Any::class.java, this::consumeRecord, @@ -46,8 +54,11 @@ class EntityConsumer( .groupIdApplicationDefaultWithUniqueSuffix() .maxPollRecordsKafkaDefault() .maxPollIntervalKafkaDefault() - .seekToBeginningOnAssignment() - .build(), + .seekToBeginningAndPerformOperationOnAssignment { assignments -> + initialKafkaBootstrapTracker.onPartitionsAssigned(KafkaListenerIds.ENTITY, assignments.keys) + }.onRevocation { partitions -> + initialKafkaBootstrapTracker.onPartitionsRevoked(KafkaListenerIds.ENTITY, partitions) + }.build(), errorHandlerFactory.createErrorHandler( KafkaConsumerErrorHandling.createLoggingErrorHandlerConfiguration( logger, @@ -58,6 +69,7 @@ class EntityConsumer( container.concurrency = consumerConfig.kafka.entityConcurrency container.containerProperties.idleBetweenPolls = consumerConfig.kafka.idleBetweenPolls container.applyConsumerFetchSettings(consumerConfig.kafka) + kafkaListenerContainerHealthConfigurer.customize(container) container.applyStartupJitter(consumerConfig.kafka) }, ).createContainer( @@ -69,13 +81,21 @@ class EntityConsumer( .orgId(TopicNamePatternParameterPattern.exactly(consumerConfig.orgId.asTopicSegment)) .domainContextApplicationDefault() .build(), - ).resource(TopicNamePatternParameterPattern.anyOf(componentTopic(), *legacyResourceTopics())) - .build(), + ).resource( + TopicNamePatternParameterPattern.exactly( + "${consumerConfig.domain}-${consumerConfig.packageName}", + ), + ).build(), ) + } fun consumeRecord(consumerRecord: ConsumerRecord) = createEntityConsumerRecord(consumerRecord) - .let { entityProcessingService.processEntityConsumerRecord(it) } + .let { entityConsumerRecord -> + entityProcessingService.processEntityConsumerRecord(entityConsumerRecord) + initialKafkaBootstrapTracker.onRecordProcessed(KafkaListenerIds.ENTITY, consumerRecord) + kafkaRuntimeHealthMonitor.onRecordProcessed(KafkaListenerIds.ENTITY) + } private fun createEntityConsumerRecord(consumerRecord: ConsumerRecord) = consumerRecord.getResourceName().let { resourceName -> @@ -87,20 +107,5 @@ class EntityConsumer( } private fun ConsumerRecord.getResourceName(): String = - if (consumerConfig.kafka.consumeLegacyResourceTopics) { - headers().stringValue(RESOURCE_NAME) ?: topic().split("-").last() - } else { - headers().stringValue(RESOURCE_NAME) ?: throw IllegalArgumentException("Resource name header not found") - } - - private fun componentTopic() = "${consumerConfig.domain}-${consumerConfig.packageName}" - - private fun legacyResourceTopics(): Array { - if (!consumerConfig.kafka.consumeLegacyResourceTopics) return emptyArray() - return metamodelService - .getComponent(consumerConfig.domain, consumerConfig.packageName)!! - .resources - .map { resource -> "${consumerConfig.domain}-${consumerConfig.packageName}-${resource.name}" } - .toTypedArray() - } + headers().stringValue(RESOURCE_NAME) ?: throw IllegalArgumentException("Resource name header not found") } diff --git a/src/main/java/no/fintlabs/consumer/kafka/event/EventResponseConsumer.kt b/src/main/java/no/fintlabs/consumer/kafka/event/EventResponseConsumer.kt index b90f691b..641d3b45 100644 --- a/src/main/java/no/fintlabs/consumer/kafka/event/EventResponseConsumer.kt +++ b/src/main/java/no/fintlabs/consumer/kafka/event/EventResponseConsumer.kt @@ -2,6 +2,10 @@ package no.fintlabs.consumer.kafka.event import no.fintlabs.adapter.models.event.ResponseFintEvent import no.fintlabs.consumer.config.ConsumerConfiguration +import no.fintlabs.consumer.health.InitialKafkaBootstrapTracker +import no.fintlabs.consumer.health.KafkaListenerContainerHealthConfigurer +import no.fintlabs.consumer.health.KafkaListenerIds +import no.fintlabs.consumer.health.KafkaRuntimeHealthMonitor import no.fintlabs.consumer.kafka.KafkaConsumerErrorHandling import no.fintlabs.consumer.kafka.applyConsumerFetchSettings import no.fintlabs.consumer.kafka.applyStartupJitter @@ -21,13 +25,19 @@ import org.springframework.kafka.listener.ConcurrentMessageListenerContainer class EventResponseConsumer( private val consumerConfig: ConsumerConfiguration, private val eventStatusCache: EventStatusCache, + private val initialKafkaBootstrapTracker: InitialKafkaBootstrapTracker, + private val kafkaRuntimeHealthMonitor: KafkaRuntimeHealthMonitor, + private val kafkaListenerContainerHealthConfigurer: KafkaListenerContainerHealthConfigurer, ) { - @Bean + @Bean(name = [KafkaListenerIds.RESPONSE_EVENT]) fun responseFintEventContainerListener( parameterizedListenerContainerFactoryService: ParameterizedListenerContainerFactoryService, errorHandlerFactory: ErrorHandlerFactory, - ): ConcurrentMessageListenerContainer = - parameterizedListenerContainerFactoryService + ): ConcurrentMessageListenerContainer { + initialKafkaBootstrapTracker.registerBlockingListener(KafkaListenerIds.RESPONSE_EVENT) + kafkaRuntimeHealthMonitor.registerListener(KafkaListenerIds.RESPONSE_EVENT) + + return parameterizedListenerContainerFactoryService .createRecordListenerContainerFactory( ResponseFintEvent::class.java, this::consumeRecord, @@ -36,8 +46,14 @@ class EventResponseConsumer( .groupIdApplicationDefaultWithUniqueSuffix() .maxPollRecordsKafkaDefault() .maxPollIntervalKafkaDefault() - .seekToBeginningOnAssignment() - .build(), + .seekToBeginningAndPerformOperationOnAssignment { assignments -> + initialKafkaBootstrapTracker.onPartitionsAssigned( + KafkaListenerIds.RESPONSE_EVENT, + assignments.keys, + ) + }.onRevocation { partitions -> + initialKafkaBootstrapTracker.onPartitionsRevoked(KafkaListenerIds.RESPONSE_EVENT, partitions) + }.build(), errorHandlerFactory.createErrorHandler( KafkaConsumerErrorHandling.createLoggingErrorHandlerConfiguration( logger, @@ -48,6 +64,7 @@ class EventResponseConsumer( container.concurrency = consumerConfig.kafka.responseConcurrency container.containerProperties.idleBetweenPolls = consumerConfig.kafka.idleBetweenPolls container.applyConsumerFetchSettings(consumerConfig.kafka) + kafkaListenerContainerHealthConfigurer.customize(container) container.applyStartupJitter(consumerConfig.kafka) }, ).createContainer( @@ -62,10 +79,13 @@ class EventResponseConsumer( ).eventName("${consumerConfig.domain}-${consumerConfig.packageName}-response") .build(), ) + } private fun consumeRecord(consumerRecord: ConsumerRecord) { logger.info("Received Response: {}", consumerRecord.value()) eventStatusCache.trackResponse(consumerRecord.value().corrId, consumerRecord.value()) + initialKafkaBootstrapTracker.onRecordProcessed(KafkaListenerIds.RESPONSE_EVENT, consumerRecord) + kafkaRuntimeHealthMonitor.onRecordProcessed(KafkaListenerIds.RESPONSE_EVENT) } companion object { diff --git a/src/main/java/no/fintlabs/consumer/kafka/event/RequestFintEventConsumer.kt b/src/main/java/no/fintlabs/consumer/kafka/event/RequestFintEventConsumer.kt index 79b17c44..598626fb 100644 --- a/src/main/java/no/fintlabs/consumer/kafka/event/RequestFintEventConsumer.kt +++ b/src/main/java/no/fintlabs/consumer/kafka/event/RequestFintEventConsumer.kt @@ -2,6 +2,10 @@ package no.fintlabs.consumer.kafka.event import no.fintlabs.adapter.models.event.RequestFintEvent import no.fintlabs.consumer.config.ConsumerConfiguration +import no.fintlabs.consumer.health.InitialKafkaBootstrapTracker +import no.fintlabs.consumer.health.KafkaListenerContainerHealthConfigurer +import no.fintlabs.consumer.health.KafkaListenerIds +import no.fintlabs.consumer.health.KafkaRuntimeHealthMonitor import no.fintlabs.consumer.kafka.KafkaConsumerErrorHandling import no.fintlabs.consumer.kafka.applyConsumerFetchSettings import no.fintlabs.consumer.kafka.applyStartupJitter @@ -21,18 +25,24 @@ import org.springframework.kafka.listener.ConcurrentMessageListenerContainer class RequestFintEventConsumer( private val consumerConfig: ConsumerConfiguration, private val eventStatusCache: EventStatusCache, + private val initialKafkaBootstrapTracker: InitialKafkaBootstrapTracker, + private val kafkaRuntimeHealthMonitor: KafkaRuntimeHealthMonitor, + private val kafkaListenerContainerHealthConfigurer: KafkaListenerContainerHealthConfigurer, ) { companion object { private val logger = LoggerFactory.getLogger(RequestFintEventConsumer::class.java) private const val CONSUMER_NAME = "request-fint-event" } - @Bean + @Bean(name = [KafkaListenerIds.REQUEST_EVENT]) fun requestFintEventRequestListenerContainer( parameterizedListenerContainerFactoryService: ParameterizedListenerContainerFactoryService, errorHandlerFactory: ErrorHandlerFactory, - ): ConcurrentMessageListenerContainer = - parameterizedListenerContainerFactoryService + ): ConcurrentMessageListenerContainer { + initialKafkaBootstrapTracker.registerBlockingListener(KafkaListenerIds.REQUEST_EVENT) + kafkaRuntimeHealthMonitor.registerListener(KafkaListenerIds.REQUEST_EVENT) + + return parameterizedListenerContainerFactoryService .createRecordListenerContainerFactory( RequestFintEvent::class.java, this::consumeRecord, @@ -41,8 +51,14 @@ class RequestFintEventConsumer( .groupIdApplicationDefaultWithUniqueSuffix() .maxPollRecordsKafkaDefault() .maxPollIntervalKafkaDefault() - .seekToBeginningOnAssignment() - .build(), + .seekToBeginningAndPerformOperationOnAssignment { assignments -> + initialKafkaBootstrapTracker.onPartitionsAssigned( + KafkaListenerIds.REQUEST_EVENT, + assignments.keys, + ) + }.onRevocation { partitions -> + initialKafkaBootstrapTracker.onPartitionsRevoked(KafkaListenerIds.REQUEST_EVENT, partitions) + }.build(), errorHandlerFactory.createErrorHandler( KafkaConsumerErrorHandling.createLoggingErrorHandlerConfiguration( logger, @@ -53,6 +69,7 @@ class RequestFintEventConsumer( container.concurrency = consumerConfig.kafka.requestConcurrency container.containerProperties.idleBetweenPolls = consumerConfig.kafka.idleBetweenPolls container.applyConsumerFetchSettings(consumerConfig.kafka) + kafkaListenerContainerHealthConfigurer.customize(container) container.applyStartupJitter(consumerConfig.kafka) }, ).createContainer( @@ -67,9 +84,12 @@ class RequestFintEventConsumer( ).eventName("${consumerConfig.domain}-${consumerConfig.packageName}-request") .build(), ) + } private fun consumeRecord(consumerRecord: ConsumerRecord) { logger.info("Received Request: {}", consumerRecord.key()) eventStatusCache.trackRequest(consumerRecord.value()) + initialKafkaBootstrapTracker.onRecordProcessed(KafkaListenerIds.REQUEST_EVENT, consumerRecord) + kafkaRuntimeHealthMonitor.onRecordProcessed(KafkaListenerIds.REQUEST_EVENT) } } diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml index 10f238a8..ec2aca2c 100644 --- a/src/main/resources/application.yaml +++ b/src/main/resources/application.yaml @@ -5,8 +5,12 @@ fint: exposed-endpoints: - /actuator/prometheus - /actuator/health + - /actuator/health/readiness + - /actuator/health/liveness + relation: base-url: https://api.felleskomponent.no + consumer: pod-url: http://fint-core-consumer-${fint.consumer.domain}-${fint.consumer.package}:8080 base-url: ${fint.relation.base-url} @@ -15,6 +19,12 @@ fint: packageName: ${fint.consumer.package} org-id: ${fint.org-id} coreVersionHeader: 2 + health: + kafka: + idle-event-interval: 1m + runtime-grace-period: 15m + monitor-interval-seconds: 30 + no-poll-threshold: 3.0 kafka: entity-concurrency: 6 relation-concurrency: 6 @@ -40,6 +50,16 @@ spring: base-path: ${fint.consumer.domain}/${fint.consumer.package} management: + endpoint: + health: + probes: + enabled: true + group: + readiness: + include: readinessState,initialKafkaBootstrap + liveness: + include: livenessState,kafkaRuntime + endpoints: web: exposure: @@ -49,9 +69,3 @@ logging: level: org.apache.kafka: WARN no.novari.kafka: WARN -# org.apache.kafka.clients.NetworkClient: INFO -# org.apache.kafka.clients.consumer.internals.AbstractCoordinator: DEBUG -# org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: DEBUG -# org.apache.kafka.clients.consumer.internals.Fetcher: DEBUG -# org.springframework.kafka.listener: DEBUG -# no.fintlabs.kafka: DEBUG diff --git a/src/test/java/no/fintlabs/consumer/health/InitialKafkaBootstrapHealthIndicatorTest.kt b/src/test/java/no/fintlabs/consumer/health/InitialKafkaBootstrapHealthIndicatorTest.kt new file mode 100644 index 00000000..0c50fff9 --- /dev/null +++ b/src/test/java/no/fintlabs/consumer/health/InitialKafkaBootstrapHealthIndicatorTest.kt @@ -0,0 +1,29 @@ +package no.fintlabs.consumer.health + +import io.mockk.every +import io.mockk.mockk +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.springframework.boot.actuate.health.Status + +class InitialKafkaBootstrapHealthIndicatorTest { + private val tracker: InitialKafkaBootstrapTracker = mockk() + + @Test + fun `should report out of service while bootstrap is incomplete`() { + every { tracker.snapshot() } returns BootstrapReadinessSnapshot(false, emptyList()) + + val health = InitialKafkaBootstrapHealthIndicator(tracker).health() + + assertEquals(Status.OUT_OF_SERVICE, health.status) + } + + @Test + fun `should report up when bootstrap is complete`() { + every { tracker.snapshot() } returns BootstrapReadinessSnapshot(true, emptyList()) + + val health = InitialKafkaBootstrapHealthIndicator(tracker).health() + + assertEquals(Status.UP, health.status) + } +} diff --git a/src/test/java/no/fintlabs/consumer/health/InitialKafkaBootstrapTrackerTest.kt b/src/test/java/no/fintlabs/consumer/health/InitialKafkaBootstrapTrackerTest.kt new file mode 100644 index 00000000..c757df6c --- /dev/null +++ b/src/test/java/no/fintlabs/consumer/health/InitialKafkaBootstrapTrackerTest.kt @@ -0,0 +1,275 @@ +package no.fintlabs.consumer.health + +import io.micrometer.core.instrument.simple.SimpleMeterRegistry +import io.mockk.every +import io.mockk.mockk +import io.mockk.verify +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.TopicPartition +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.springframework.context.ApplicationContext +import java.time.Duration +import java.time.Instant +import java.util.concurrent.TimeoutException + +class InitialKafkaBootstrapTrackerTest { + private val endOffsetProvider: EndOffsetProvider = mockk() + private val applicationContext: ApplicationContext = mockk(relaxed = true) + private val meterRegistry = SimpleMeterRegistry() + private val clock = MutableClock(Instant.parse("2026-03-20T10:00:00Z")) + private val kafkaHealthMetrics = KafkaHealthMetrics(meterRegistry, clock, KafkaHealthProperties()) + + private lateinit var tracker: InitialKafkaBootstrapTracker + + @BeforeEach + fun setUp() { + tracker = + InitialKafkaBootstrapTracker( + endOffsetProvider = endOffsetProvider, + applicationContext = applicationContext, + kafkaHealthMetrics = kafkaHealthMetrics, + kafkaHealthProperties = KafkaHealthProperties(), + ) + tracker.registerBlockingListener(KafkaListenerIds.ENTITY) + } + + @Test + fun `should stay unready until all assigned partitions catch up`() { + val partition0 = TopicPartition("topic", 0) + val partition1 = TopicPartition("topic", 1) + + every { endOffsetProvider.latestOffsets(setOf(partition0, partition1)) } returns + mapOf(partition0 to 2L, partition1 to 1L) + + tracker.onPartitionsAssigned(KafkaListenerIds.ENTITY, setOf(partition0, partition1)) + tracker.refreshPendingEndOffsets() + + assertFalse(tracker.snapshot().ready) + + tracker.onRecordProcessed(KafkaListenerIds.ENTITY, ConsumerRecord("topic", 0, 0L, "key", "value")) + assertFalse(tracker.snapshot().ready) + + tracker.onRecordProcessed(KafkaListenerIds.ENTITY, ConsumerRecord("topic", 1, 0L, "key", "value")) + assertFalse(tracker.snapshot().ready) + + tracker.onRecordProcessed(KafkaListenerIds.ENTITY, ConsumerRecord("topic", 0, 1L, "key", "value")) + assertTrue(tracker.snapshot().ready) + } + + @Test + fun `should become ready immediately when assigned partitions are empty at startup offset`() { + val partition0 = TopicPartition("topic", 0) + + every { endOffsetProvider.latestOffsets(setOf(partition0)) } returns + mapOf(partition0 to 0L) + + tracker.onPartitionsAssigned(KafkaListenerIds.ENTITY, setOf(partition0)) + tracker.refreshPendingEndOffsets() + + assertTrue(tracker.snapshot().ready) + } + + @Test + fun `should ignore new assignments after initial bootstrap has completed`() { + val partition0 = TopicPartition("topic", 0) + val partition1 = TopicPartition("topic", 1) + + every { endOffsetProvider.latestOffsets(setOf(partition0)) } returns mapOf(partition0 to 1L) + every { endOffsetProvider.latestOffsets(setOf(partition1)) } returns mapOf(partition1 to 2L) + + tracker.onPartitionsAssigned(KafkaListenerIds.ENTITY, setOf(partition0)) + tracker.refreshPendingEndOffsets() + tracker.onRecordProcessed(KafkaListenerIds.ENTITY, ConsumerRecord("topic", 0, 0L, "key", "value")) + + assertTrue(tracker.snapshot().ready) + + tracker.onPartitionsAssigned(KafkaListenerIds.ENTITY, setOf(partition1)) + tracker.refreshPendingEndOffsets() + + assertTrue(tracker.snapshot().ready) + } + + @Test + fun `should wait for both entity and relation update listeners`() { + val entityPartition = TopicPartition("entity-topic", 0) + val relationPartition = TopicPartition("relation-topic", 0) + + tracker.registerBlockingListener(KafkaListenerIds.RELATION_UPDATE) + + every { endOffsetProvider.latestOffsets(setOf(entityPartition)) } returns mapOf(entityPartition to 1L) + every { endOffsetProvider.latestOffsets(setOf(relationPartition)) } returns mapOf(relationPartition to 1L) + + tracker.onPartitionsAssigned(KafkaListenerIds.ENTITY, setOf(entityPartition)) + tracker.refreshPendingEndOffsets() + tracker.onPartitionsAssigned(KafkaListenerIds.RELATION_UPDATE, setOf(relationPartition)) + tracker.refreshPendingEndOffsets() + tracker.onRecordProcessed(KafkaListenerIds.ENTITY, ConsumerRecord("entity-topic", 0, 0L, "key", "value")) + + assertFalse(tracker.snapshot().ready) + + tracker.onRecordProcessed( + KafkaListenerIds.RELATION_UPDATE, + ConsumerRecord("relation-topic", 0, 0L, "key", "value"), + ) + + assertTrue(tracker.snapshot().ready) + } + + @Test + fun `should publish bootstrap metrics for completion and pending partitions`() { + val partition0 = TopicPartition("topic", 0) + val partition1 = TopicPartition("topic", 1) + + every { endOffsetProvider.latestOffsets(setOf(partition0, partition1)) } returns + mapOf(partition0 to 2L, partition1 to 1L) + + tracker.onPartitionsAssigned(KafkaListenerIds.ENTITY, setOf(partition0, partition1)) + tracker.refreshPendingEndOffsets() + + assertEquals( + 2.0, + meterRegistry + .get("fint.consumer.kafka.bootstrap.partitions.pending") + .tag("listener", KafkaListenerIds.ENTITY) + .gauge() + .value(), + ) + assertEquals( + 1.0, + meterRegistry + .get("fint.consumer.kafka.bootstrap.state") + .tag("listener", KafkaListenerIds.ENTITY) + .gauge() + .value(), + ) + + clock.advance(Duration.ofSeconds(2)) + tracker.onRecordProcessed(KafkaListenerIds.ENTITY, ConsumerRecord("topic", 0, 0L, "key", "value")) + tracker.onRecordProcessed(KafkaListenerIds.ENTITY, ConsumerRecord("topic", 1, 0L, "key", "value")) + tracker.onRecordProcessed(KafkaListenerIds.ENTITY, ConsumerRecord("topic", 0, 1L, "key", "value")) + + assertEquals( + 0.0, + meterRegistry + .get("fint.consumer.kafka.bootstrap.partitions.pending") + .tag("listener", KafkaListenerIds.ENTITY) + .gauge() + .value(), + ) + assertEquals( + 0.0, + meterRegistry + .get("fint.consumer.kafka.bootstrap.state") + .tag("listener", KafkaListenerIds.ENTITY) + .gauge() + .value(), + ) + assertEquals( + 1.0, + meterRegistry + .get("fint.consumer.kafka.bootstrap.completed") + .tag("listener", KafkaListenerIds.ENTITY) + .counter() + .count(), + ) + assertEquals( + 1.0, + meterRegistry + .get("fint.consumer.kafka.bootstrap.completed") + .tag("listener", "all") + .counter() + .count(), + ) + } + + @Test + fun `should count end offset lookup failures and recover on retry`() { + val partition0 = TopicPartition("topic", 0) + + every { endOffsetProvider.latestOffsets(setOf(partition0)) } throws + RuntimeException("boom") andThen mapOf(partition0 to 1L) + + tracker.onPartitionsAssigned(KafkaListenerIds.ENTITY, setOf(partition0)) + tracker.refreshPendingEndOffsets() + + assertFalse(tracker.snapshot().ready) + assertEquals( + 1.0, + meterRegistry + .get("fint.consumer.kafka.bootstrap.end_offset.lookup.failures") + .tag("listener", KafkaListenerIds.ENTITY) + .counter() + .count(), + ) + + tracker.refreshPendingEndOffsets() + tracker.onRecordProcessed(KafkaListenerIds.ENTITY, ConsumerRecord("topic", 0, 0L, "key", "value")) + + assertTrue(tracker.snapshot().ready) + } + + @Test + fun `should keep consumer alive when end offset lookup throws checked exception`() { + val partition0 = TopicPartition("topic", 0) + + every { endOffsetProvider.latestOffsets(setOf(partition0)) } throws + TimeoutException("kafka admin timeout") andThen mapOf(partition0 to 0L) + + tracker.onPartitionsAssigned(KafkaListenerIds.ENTITY, setOf(partition0)) + tracker.refreshPendingEndOffsets() + + assertFalse(tracker.snapshot().ready) + assertEquals( + 1.0, + meterRegistry + .get("fint.consumer.kafka.bootstrap.end_offset.lookup.failures") + .tag("listener", KafkaListenerIds.ENTITY) + .counter() + .count(), + ) + + tracker.refreshPendingEndOffsets() + assertTrue(tracker.snapshot().ready) + } + + @Test + fun `should preserve records processed before end offset arrives`() { + val partition0 = TopicPartition("topic", 0) + + every { endOffsetProvider.latestOffsets(setOf(partition0)) } returns mapOf(partition0 to 6L) + + tracker.onPartitionsAssigned(KafkaListenerIds.ENTITY, setOf(partition0)) + tracker.onRecordProcessed(KafkaListenerIds.ENTITY, ConsumerRecord("topic", 0, 5L, "key", "value")) + + assertFalse(tracker.snapshot().ready) + + tracker.refreshPendingEndOffsets() + + assertTrue(tracker.snapshot().ready) + } + + @Test + fun `should skip end offset lookup for partitions revoked before refresh`() { + val partition0 = TopicPartition("topic", 0) + val partition1 = TopicPartition("topic", 1) + + every { endOffsetProvider.latestOffsets(setOf(partition1)) } returns mapOf(partition1 to 0L) + + tracker.onPartitionsAssigned(KafkaListenerIds.ENTITY, setOf(partition0, partition1)) + tracker.onPartitionsRevoked(KafkaListenerIds.ENTITY, listOf(partition0)) + tracker.refreshPendingEndOffsets() + + assertTrue(tracker.snapshot().ready) + verify { endOffsetProvider.latestOffsets(setOf(partition1)) } + } + + @Test + fun `should not call end offset provider when no partitions are pending`() { + tracker.refreshPendingEndOffsets() + verify(exactly = 0) { endOffsetProvider.latestOffsets(any()) } + } +} diff --git a/src/test/java/no/fintlabs/consumer/health/KafkaRuntimeHealthMonitorTest.kt b/src/test/java/no/fintlabs/consumer/health/KafkaRuntimeHealthMonitorTest.kt new file mode 100644 index 00000000..4b6bd637 --- /dev/null +++ b/src/test/java/no/fintlabs/consumer/health/KafkaRuntimeHealthMonitorTest.kt @@ -0,0 +1,141 @@ +package no.fintlabs.consumer.health + +import io.micrometer.core.instrument.simple.SimpleMeterRegistry +import io.mockk.mockk +import org.apache.kafka.clients.consumer.Consumer +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.springframework.boot.actuate.health.Status +import org.springframework.kafka.event.NonResponsiveConsumerEvent +import java.time.Duration +import java.time.Instant + +class KafkaRuntimeHealthMonitorTest { + @Test + fun `should stay up during grace period for non responsive consumer`() { + val clock = MutableClock(Instant.parse("2026-03-20T10:00:00Z")) + val monitor = + KafkaRuntimeHealthMonitor( + KafkaHealthProperties(runtimeGracePeriod = Duration.ofMinutes(15)), + clock, + KafkaHealthMetrics( + SimpleMeterRegistry(), + clock, + KafkaHealthProperties(runtimeGracePeriod = Duration.ofMinutes(15)), + ), + ) + + monitor.registerListener(KafkaListenerIds.ENTITY) + monitor.onRecordProcessed(KafkaListenerIds.ENTITY) + monitor.onNonResponsiveConsumer(nonResponsiveConsumerEvent(KafkaListenerIds.ENTITY)) + + clock.advance(Duration.ofMinutes(14)) + + assertEquals(Status.UP, monitor.health().status) + } + + @Test + fun `should go down after grace period for non responsive consumer`() { + val clock = MutableClock(Instant.parse("2026-03-20T10:00:00Z")) + val monitor = + KafkaRuntimeHealthMonitor( + KafkaHealthProperties(runtimeGracePeriod = Duration.ofMinutes(15)), + clock, + KafkaHealthMetrics( + SimpleMeterRegistry(), + clock, + KafkaHealthProperties(runtimeGracePeriod = Duration.ofMinutes(15)), + ), + ) + + monitor.registerListener(KafkaListenerIds.ENTITY) + monitor.onRecordProcessed(KafkaListenerIds.ENTITY) + monitor.onNonResponsiveConsumer(nonResponsiveConsumerEvent(KafkaListenerIds.ENTITY)) + + clock.advance(Duration.ofMinutes(16)) + + assertEquals(Status.DOWN, monitor.health().status) + } + + @Test + fun `should recover after healthy activity resumes`() { + val clock = MutableClock(Instant.parse("2026-03-20T10:00:00Z")) + val monitor = + KafkaRuntimeHealthMonitor( + KafkaHealthProperties(runtimeGracePeriod = Duration.ofMinutes(15)), + clock, + KafkaHealthMetrics( + SimpleMeterRegistry(), + clock, + KafkaHealthProperties(runtimeGracePeriod = Duration.ofMinutes(15)), + ), + ) + + monitor.registerListener(KafkaListenerIds.ENTITY) + monitor.onNonResponsiveConsumer(nonResponsiveConsumerEvent(KafkaListenerIds.ENTITY)) + clock.advance(Duration.ofMinutes(5)) + + monitor.onRecordProcessed(KafkaListenerIds.ENTITY) + clock.advance(Duration.ofMinutes(20)) + + assertEquals(Status.UP, monitor.health().status) + } + + @Test + fun `should publish runtime metrics for problem and unhealthy state`() { + val clock = MutableClock(Instant.parse("2026-03-20T10:00:00Z")) + val meterRegistry = SimpleMeterRegistry() + val properties = KafkaHealthProperties(runtimeGracePeriod = Duration.ofMinutes(15)) + val monitor = KafkaRuntimeHealthMonitor(properties, clock, KafkaHealthMetrics(meterRegistry, clock, properties)) + + monitor.registerListener(KafkaListenerIds.ENTITY) + monitor.onNonResponsiveConsumer(nonResponsiveConsumerEvent(KafkaListenerIds.ENTITY)) + + assertEquals( + 1.0, + meterRegistry + .get("fint.consumer.kafka.runtime.problem") + .tag("listener", KafkaListenerIds.ENTITY) + .tag("reason", "NON_RESPONSIVE") + .counter() + .count(), + ) + assertEquals( + 0.0, + meterRegistry + .get("fint.consumer.kafka.runtime.unhealthy") + .tag("listener", KafkaListenerIds.ENTITY) + .gauge() + .value(), + ) + + clock.advance(Duration.ofMinutes(16)) + + assertEquals( + 1.0, + meterRegistry + .get("fint.consumer.kafka.runtime.unhealthy") + .tag("listener", KafkaListenerIds.ENTITY) + .gauge() + .value(), + ) + assertEquals( + Duration.ofMinutes(16).toMillis().toDouble(), + meterRegistry + .get("fint.consumer.kafka.runtime.problem.duration") + .tag("listener", KafkaListenerIds.ENTITY) + .gauge() + .value(), + ) + } +} + +private fun nonResponsiveConsumerEvent(listenerId: String): NonResponsiveConsumerEvent = + NonResponsiveConsumerEvent( + Any(), + Any(), + 1_000L, + listenerId, + emptyList(), + mockk>(relaxed = true), + ) diff --git a/src/test/java/no/fintlabs/consumer/health/MutableClock.kt b/src/test/java/no/fintlabs/consumer/health/MutableClock.kt new file mode 100644 index 00000000..7dd866f5 --- /dev/null +++ b/src/test/java/no/fintlabs/consumer/health/MutableClock.kt @@ -0,0 +1,21 @@ +package no.fintlabs.consumer.health + +import java.time.Clock +import java.time.Duration +import java.time.Instant +import java.time.ZoneId +import java.time.ZoneOffset + +class MutableClock( + private var instant: Instant, +) : Clock() { + override fun getZone(): ZoneId = ZoneOffset.UTC + + override fun withZone(zone: ZoneId): Clock = this + + override fun instant(): Instant = instant + + fun advance(duration: Duration) { + instant = instant.plus(duration) + } +} diff --git a/src/test/java/no/fintlabs/consumer/kafka/entity/EntityConsumerTest.kt b/src/test/java/no/fintlabs/consumer/kafka/entity/EntityConsumerTest.kt index bda560ec..e9ee11b9 100644 --- a/src/test/java/no/fintlabs/consumer/kafka/entity/EntityConsumerTest.kt +++ b/src/test/java/no/fintlabs/consumer/kafka/entity/EntityConsumerTest.kt @@ -7,6 +7,9 @@ import io.mockk.verify import no.fintlabs.consumer.config.ConsumerConfiguration import no.fintlabs.consumer.config.KafkaConfiguration import no.fintlabs.consumer.config.OrgId +import no.fintlabs.consumer.health.InitialKafkaBootstrapTracker +import no.fintlabs.consumer.health.KafkaListenerContainerHealthConfigurer +import no.fintlabs.consumer.health.KafkaRuntimeHealthMonitor import no.fintlabs.consumer.kafka.KafkaConstants.LAST_MODIFIED import no.fintlabs.consumer.kafka.KafkaConstants.RESOURCE_NAME import no.fintlabs.consumer.resource.ResourceConverter @@ -16,9 +19,6 @@ import no.novari.kafka.consuming.ParameterizedListenerContainerFactory import no.novari.kafka.consuming.ParameterizedListenerContainerFactoryService import no.novari.kafka.topic.name.EntityTopicNamePatternParameters import no.novari.kafka.topic.name.TopicNamePatternParameters -import no.novari.metamodel.MetamodelService -import no.novari.metamodel.model.Component -import no.novari.metamodel.model.Resource import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.ConsumerRecord.NULL_SIZE @@ -42,11 +42,13 @@ class EntityConsumerTest { private lateinit var entityProcessingService: EntityProcessingService private lateinit var consumerConfig: ConsumerConfiguration private lateinit var resourceConverter: ResourceConverter - private lateinit var metamodelService: MetamodelService private lateinit var factoryService: ParameterizedListenerContainerFactoryService private lateinit var errorHandlerFactory: ErrorHandlerFactory private lateinit var factory: ParameterizedListenerContainerFactory private lateinit var container: ConcurrentMessageListenerContainer + private lateinit var initialKafkaBootstrapTracker: InitialKafkaBootstrapTracker + private lateinit var kafkaRuntimeHealthMonitor: KafkaRuntimeHealthMonitor + private lateinit var kafkaListenerContainerHealthConfigurer: KafkaListenerContainerHealthConfigurer private lateinit var entityConsumer: EntityConsumer @BeforeEach @@ -54,10 +56,12 @@ class EntityConsumerTest { entityProcessingService = mockk(relaxed = true) consumerConfig = mockk() resourceConverter = mockk(relaxed = true) - metamodelService = mockk() factoryService = mockk() errorHandlerFactory = mockk(relaxed = true) factory = mockk() + initialKafkaBootstrapTracker = mockk(relaxed = true) + kafkaRuntimeHealthMonitor = mockk(relaxed = true) + kafkaListenerContainerHealthConfigurer = mockk(relaxed = true) container = ConcurrentMessageListenerContainer( mockk>(relaxed = true), @@ -79,12 +83,20 @@ class EntityConsumerTest { } returns factory every { factory.createContainer(any()) } returns container - entityConsumer = EntityConsumer(entityProcessingService, consumerConfig, resourceConverter, metamodelService) + entityConsumer = + EntityConsumer( + entityProcessingService, + consumerConfig, + resourceConverter, + initialKafkaBootstrapTracker, + kafkaRuntimeHealthMonitor, + kafkaListenerContainerHealthConfigurer, + ) } @Test - fun `when consumeLegacyResourceTopics is disabled, only component topic is consumed`() { - every { consumerConfig.kafka } returns KafkaConfiguration(consumeLegacyResourceTopics = false) + fun `only component topic is consumed`() { + every { consumerConfig.kafka } returns KafkaConfiguration() val captured = slot() every { factory.createContainer(capture(captured)) } returns container @@ -98,33 +110,6 @@ class EntityConsumerTest { assertEquals(listOf("utdanning-vurdering"), resourcePattern.anyOfValues) } - @Test - fun `when consumeLegacyResourceTopics is enabled, component topic and one topic per resource are consumed`() { - every { consumerConfig.kafka } returns KafkaConfiguration(consumeLegacyResourceTopics = true) - - val component = mockk() - val resource1 = mockk() - val resource2 = mockk() - every { resource1.name } returns "elevfravar" - every { resource2.name } returns "eksamenskarakter" - every { component.resources } returns listOf(resource1, resource2) - every { metamodelService.getComponent("utdanning", "vurdering") } returns component - - val captured = slot() - every { factory.createContainer(capture(captured)) } returns container - - entityConsumer.resourceEntityConsumerFactory(factoryService, errorHandlerFactory) - - val resourcePattern = - captured.captured.topicNamePatternSuffixParameters - .first() - .pattern - assertEquals(3, resourcePattern.anyOfValues.size) - assertTrue(resourcePattern.anyOfValues.contains("utdanning-vurdering")) - assertTrue(resourcePattern.anyOfValues.contains("utdanning-vurdering-elevfravar")) - assertTrue(resourcePattern.anyOfValues.contains("utdanning-vurdering-eksamenskarakter")) - } - @Test fun `container gets fetch and idle settings from consumer configuration`() { every { @@ -163,8 +148,8 @@ class EntityConsumerTest { } @Test - fun `when consumeLegacyResourceTopics is disabled and header is present, resource name is read from header`() { - every { consumerConfig.kafka } returns KafkaConfiguration(consumeLegacyResourceTopics = false) + fun `when header is present, resource name is read from header`() { + every { consumerConfig.kafka } returns KafkaConfiguration() val captured = slot() every { entityProcessingService.processEntityConsumerRecord(capture(captured)) } returns Unit @@ -180,49 +165,14 @@ class EntityConsumerTest { } @Test - fun `when consumeLegacyResourceTopics is disabled and header is missing, an exception is thrown`() { - every { consumerConfig.kafka } returns KafkaConfiguration(consumeLegacyResourceTopics = false) + fun `when header is missing, an exception is thrown`() { + every { consumerConfig.kafka } returns KafkaConfiguration() assertThrows { entityConsumer.consumeRecord(createConsumerRecord(topic = "utdanning-vurdering", resourceNameHeader = null)) } } - @Test - fun `when consumeLegacyResourceTopics is enabled and header is present, resource name is read from header`() { - every { consumerConfig.kafka } returns KafkaConfiguration(consumeLegacyResourceTopics = true) - - val captured = slot() - every { entityProcessingService.processEntityConsumerRecord(capture(captured)) } returns Unit - - entityConsumer.consumeRecord( - createConsumerRecord( - topic = "utdanning-vurdering-elevfravar", - resourceNameHeader = "elevfravar", - ), - ) - - assertEquals("elevfravar", captured.captured.resourceName) - } - - @Suppress("ktlint:standard:max-line-length") - @Test - fun `when consumeLegacyResourceTopics is enabled and header is missing, resource name falls back to last topic segment`() { - every { consumerConfig.kafka } returns KafkaConfiguration(consumeLegacyResourceTopics = true) - - val captured = slot() - every { entityProcessingService.processEntityConsumerRecord(capture(captured)) } returns Unit - - entityConsumer.consumeRecord( - createConsumerRecord( - topic = "utdanning-vurdering-elevfravar", - resourceNameHeader = null, - ), - ) - - assertEquals("elevfravar", captured.captured.resourceName) - } - @Test fun `listener configuration seeks to beginning on partition assignment`() { val config = captureListenerConfig() diff --git a/src/test/java/no/fintlabs/consumer/kafka/entity/RelationUpdateConsumerTest.kt b/src/test/java/no/fintlabs/consumer/kafka/entity/RelationUpdateConsumerTest.kt index c5b02b2f..e95bc70b 100644 --- a/src/test/java/no/fintlabs/consumer/kafka/entity/RelationUpdateConsumerTest.kt +++ b/src/test/java/no/fintlabs/consumer/kafka/entity/RelationUpdateConsumerTest.kt @@ -11,6 +11,9 @@ import no.fintlabs.autorelation.model.createEntityDescriptor import no.fintlabs.consumer.config.ConsumerConfiguration import no.fintlabs.consumer.config.KafkaConfiguration import no.fintlabs.consumer.config.OrgId +import no.fintlabs.consumer.health.InitialKafkaBootstrapTracker +import no.fintlabs.consumer.health.KafkaListenerContainerHealthConfigurer +import no.fintlabs.consumer.health.KafkaRuntimeHealthMonitor import no.fintlabs.consumer.kafka.KafkaThroughputMetrics import no.novari.kafka.consuming.ErrorHandlerFactory import no.novari.kafka.consuming.ListenerConfiguration @@ -21,10 +24,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.TopicPartition import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertDoesNotThrow import org.springframework.kafka.listener.ConcurrentMessageListenerContainer import org.springframework.kafka.listener.ConsumerSeekAware -import java.util.UUID import java.util.function.Consumer import kotlin.test.assertTrue @@ -35,6 +36,9 @@ class RelationUpdateConsumerTest { private lateinit var consumerRecord: ConsumerRecord private lateinit var relationUpdate: RelationUpdate private lateinit var kafkaThroughputMetrics: KafkaThroughputMetrics + private lateinit var initialKafkaBootstrapTracker: InitialKafkaBootstrapTracker + private lateinit var kafkaRuntimeHealthMonitor: KafkaRuntimeHealthMonitor + private lateinit var kafkaListenerContainerHealthConfigurer: KafkaListenerContainerHealthConfigurer @BeforeEach fun setUp() { @@ -47,7 +51,18 @@ class RelationUpdateConsumerTest { } kafkaThroughputMetrics = mockk(relaxed = true) - relationUpdateConsumer = RelationUpdateConsumer(autoRelationService, consumerConfig, kafkaThroughputMetrics) + initialKafkaBootstrapTracker = mockk(relaxed = true) + kafkaRuntimeHealthMonitor = mockk(relaxed = true) + kafkaListenerContainerHealthConfigurer = mockk(relaxed = true) + relationUpdateConsumer = + RelationUpdateConsumer( + autoRelationService, + consumerConfig, + kafkaThroughputMetrics, + initialKafkaBootstrapTracker, + kafkaRuntimeHealthMonitor, + kafkaListenerContainerHealthConfigurer, + ) } @Test diff --git a/src/test/java/no/fintlabs/consumer/kafka/event/EventResponseConsumerTest.kt b/src/test/java/no/fintlabs/consumer/kafka/event/EventResponseConsumerTest.kt index 7ec5d0a5..32ebeca4 100644 --- a/src/test/java/no/fintlabs/consumer/kafka/event/EventResponseConsumerTest.kt +++ b/src/test/java/no/fintlabs/consumer/kafka/event/EventResponseConsumerTest.kt @@ -8,6 +8,9 @@ import no.fintlabs.adapter.models.event.ResponseFintEvent import no.fintlabs.consumer.config.ConsumerConfiguration import no.fintlabs.consumer.config.KafkaConfiguration import no.fintlabs.consumer.config.OrgId +import no.fintlabs.consumer.health.InitialKafkaBootstrapTracker +import no.fintlabs.consumer.health.KafkaListenerContainerHealthConfigurer +import no.fintlabs.consumer.health.KafkaRuntimeHealthMonitor import no.fintlabs.consumer.resource.event.EventStatusCache import no.novari.kafka.consuming.ErrorHandlerFactory import no.novari.kafka.consuming.ListenerConfiguration @@ -30,6 +33,9 @@ class EventResponseConsumerTest { private lateinit var errorHandlerFactory: ErrorHandlerFactory private lateinit var factory: ParameterizedListenerContainerFactory private lateinit var container: ConcurrentMessageListenerContainer + private lateinit var initialKafkaBootstrapTracker: InitialKafkaBootstrapTracker + private lateinit var kafkaRuntimeHealthMonitor: KafkaRuntimeHealthMonitor + private lateinit var kafkaListenerContainerHealthConfigurer: KafkaListenerContainerHealthConfigurer private lateinit var eventResponseConsumer: EventResponseConsumer @BeforeEach @@ -40,6 +46,9 @@ class EventResponseConsumerTest { errorHandlerFactory = mockk(relaxed = true) factory = mockk() container = mockk(relaxed = true) + initialKafkaBootstrapTracker = mockk(relaxed = true) + kafkaRuntimeHealthMonitor = mockk(relaxed = true) + kafkaListenerContainerHealthConfigurer = mockk(relaxed = true) every { consumerConfig.orgId } returns OrgId.from("foo.bar") every { consumerConfig.domain } returns "utdanning" @@ -57,7 +66,14 @@ class EventResponseConsumerTest { } returns factory every { factory.createContainer(any()) } returns container - eventResponseConsumer = EventResponseConsumer(consumerConfig, eventStatusCache) + eventResponseConsumer = + EventResponseConsumer( + consumerConfig, + eventStatusCache, + initialKafkaBootstrapTracker, + kafkaRuntimeHealthMonitor, + kafkaListenerContainerHealthConfigurer, + ) } @Test diff --git a/src/test/java/no/fintlabs/consumer/kafka/event/RequestFintEventConsumerTest.kt b/src/test/java/no/fintlabs/consumer/kafka/event/RequestFintEventConsumerTest.kt index 93759c8f..735df0e0 100644 --- a/src/test/java/no/fintlabs/consumer/kafka/event/RequestFintEventConsumerTest.kt +++ b/src/test/java/no/fintlabs/consumer/kafka/event/RequestFintEventConsumerTest.kt @@ -8,6 +8,9 @@ import no.fintlabs.adapter.models.event.RequestFintEvent import no.fintlabs.consumer.config.ConsumerConfiguration import no.fintlabs.consumer.config.KafkaConfiguration import no.fintlabs.consumer.config.OrgId +import no.fintlabs.consumer.health.InitialKafkaBootstrapTracker +import no.fintlabs.consumer.health.KafkaListenerContainerHealthConfigurer +import no.fintlabs.consumer.health.KafkaRuntimeHealthMonitor import no.fintlabs.consumer.resource.event.EventStatusCache import no.novari.kafka.consuming.ErrorHandlerFactory import no.novari.kafka.consuming.ListenerConfiguration @@ -18,10 +21,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.TopicPartition import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertDoesNotThrow import org.springframework.kafka.listener.ConcurrentMessageListenerContainer import org.springframework.kafka.listener.ConsumerSeekAware -import java.util.UUID import java.util.function.Consumer import kotlin.test.assertTrue @@ -33,6 +34,9 @@ class RequestFintEventConsumerTest { private lateinit var factory: ParameterizedListenerContainerFactory private lateinit var container: ConcurrentMessageListenerContainer private lateinit var requestFintEventConsumer: RequestFintEventConsumer + private lateinit var initialKafkaBootstrapTracker: InitialKafkaBootstrapTracker + private lateinit var kafkaRuntimeHealthMonitor: KafkaRuntimeHealthMonitor + private lateinit var kafkaListenerContainerHealthConfigurer: KafkaListenerContainerHealthConfigurer @BeforeEach fun setUp() { @@ -42,6 +46,9 @@ class RequestFintEventConsumerTest { errorHandlerFactory = mockk(relaxed = true) factory = mockk() container = mockk(relaxed = true) + initialKafkaBootstrapTracker = mockk(relaxed = true) + kafkaRuntimeHealthMonitor = mockk(relaxed = true) + kafkaListenerContainerHealthConfigurer = mockk(relaxed = true) every { consumerConfig.orgId } returns OrgId.from("foo.bar") every { consumerConfig.domain } returns "utdanning" @@ -59,7 +66,14 @@ class RequestFintEventConsumerTest { } returns factory every { factory.createContainer(any()) } returns container - requestFintEventConsumer = RequestFintEventConsumer(consumerConfig, eventStatusCache) + requestFintEventConsumer = + RequestFintEventConsumer( + consumerConfig, + eventStatusCache, + initialKafkaBootstrapTracker, + kafkaRuntimeHealthMonitor, + kafkaListenerContainerHealthConfigurer, + ) } @Test