From 33ca016a06652d83f0134b5b61dbe99874e5fa65 Mon Sep 17 00:00:00 2001 From: Igor Date: Sun, 24 May 2026 18:54:11 +0300 Subject: [PATCH 1/9] Fix command-service KafkaTemplate wiring --- .../command/config/KafkaProducerConfig.java | 47 +++++++++++++++++++ 1 file changed, 47 insertions(+) create mode 100644 backend/command-service/src/main/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfig.java diff --git a/backend/command-service/src/main/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfig.java b/backend/command-service/src/main/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfig.java new file mode 100644 index 0000000..e806043 --- /dev/null +++ b/backend/command-service/src/main/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfig.java @@ -0,0 +1,47 @@ +package lt.satsyuk.distributed.audit.command.config; + +import lt.satsyuk.distributed.audit.event.AuditEvent; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; + +import java.util.HashMap; +import java.util.Map; + +/** + * Explicit producer bean for {@code KafkaTemplate}. + * + *

Spring Boot 4 may otherwise expose only a raw {@code KafkaTemplate} + * bean, which does not satisfy the strongly-typed constructor dependency. + */ +@Configuration +public class KafkaProducerConfig { + + private static final String JSON_SERIALIZER_CLASS = "org.springframework.kafka.support.serializer.JsonSerializer"; + private static final String JSON_ADD_TYPE_HEADERS_CONFIG = "spring.json.add.type.headers"; + + @Bean + public ProducerFactory auditEventProducerFactory( + @Value("${spring.kafka.bootstrap-servers}") String bootstrapServers + ) { + Map props = new HashMap<>(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JSON_SERIALIZER_CLASS); + props.put(JSON_ADD_TYPE_HEADERS_CONFIG, false); + return new DefaultKafkaProducerFactory<>(props); + } + + @Bean + public KafkaTemplate auditEventKafkaTemplate( + ProducerFactory auditEventProducerFactory + ) { + return new KafkaTemplate<>(auditEventProducerFactory); + } +} + From 4699214d63b4757734c70031e4665b0f30684604 Mon Sep 17 00:00:00 2001 From: Igor Date: Sun, 24 May 2026 19:04:29 +0300 Subject: [PATCH 2/9] Address review feedback for KafkaProducerConfig --- .../audit/command/config/KafkaProducerConfig.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/backend/command-service/src/main/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfig.java b/backend/command-service/src/main/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfig.java index e806043..2de5eb4 100644 --- a/backend/command-service/src/main/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfig.java +++ b/backend/command-service/src/main/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfig.java @@ -6,9 +6,11 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.support.serializer.JacksonJsonSerializer; import java.util.HashMap; import java.util.Map; @@ -22,22 +24,23 @@ @Configuration public class KafkaProducerConfig { - private static final String JSON_SERIALIZER_CLASS = "org.springframework.kafka.support.serializer.JsonSerializer"; private static final String JSON_ADD_TYPE_HEADERS_CONFIG = "spring.json.add.type.headers"; @Bean + @Primary public ProducerFactory auditEventProducerFactory( - @Value("${spring.kafka.bootstrap-servers}") String bootstrapServers + @Value("${spring.kafka.bootstrap-servers:localhost:9092}") String bootstrapServers ) { Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JSON_SERIALIZER_CLASS); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonJsonSerializer.class); props.put(JSON_ADD_TYPE_HEADERS_CONFIG, false); return new DefaultKafkaProducerFactory<>(props); } @Bean + @Primary public KafkaTemplate auditEventKafkaTemplate( ProducerFactory auditEventProducerFactory ) { From cd391f5545622cd0823df0ae0b4a6e2c6d33e364 Mon Sep 17 00:00:00 2001 From: Igor Date: Sun, 24 May 2026 19:16:49 +0300 Subject: [PATCH 3/9] [#154] Add SpringBootTest for KafkaProducerConfig bean wiring --- .../KafkaProducerConfigIntegrationTest.java | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) create mode 100644 backend/command-service/src/test/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfigIntegrationTest.java diff --git a/backend/command-service/src/test/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfigIntegrationTest.java b/backend/command-service/src/test/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfigIntegrationTest.java new file mode 100644 index 0000000..ef7b6b5 --- /dev/null +++ b/backend/command-service/src/test/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfigIntegrationTest.java @@ -0,0 +1,43 @@ +package lt.satsyuk.distributed.audit.command.config; + +import lt.satsyuk.distributed.audit.event.AuditEvent; +import lt.satsyuk.distributed.audit.command.service.AuditCommandPublisher; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.test.context.TestPropertySource; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Verifies that {@link KafkaProducerConfig} correctly registers a strongly-typed + * {@code KafkaTemplate} bean and that {@link AuditCommandPublisher} + * is wired successfully from the real application context. + */ +@SpringBootTest +@EmbeddedKafka(partitions = 1, topics = {"user.login.events"}) +@TestPropertySource(properties = { + "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", + "spring.kafka.producer.bootstrap-servers=${spring.embedded.kafka.brokers}" +}) +class KafkaProducerConfigIntegrationTest { + + @Autowired + private KafkaTemplate auditEventKafkaTemplate; + + @Autowired + private AuditCommandPublisher auditCommandPublisher; + + @Test + void auditEventKafkaTemplateBeanIsCreated() { + assertThat(auditEventKafkaTemplate).isNotNull(); + } + + @Test + void auditCommandPublisherBeanIsCreated() { + assertThat(auditCommandPublisher).isNotNull(); + } +} + From bbb464043287e77b840446b8cb5c1ab6aab62d30 Mon Sep 17 00:00:00 2001 From: Igor Date: Sun, 24 May 2026 19:54:14 +0300 Subject: [PATCH 4/9] [#154] Align Kafka producer wiring with Spring config --- .../command/config/KafkaProducerConfig.java | 33 ++++++++++++++----- .../KafkaProducerConfigIntegrationTest.java | 33 +++++++++++++++++-- 2 files changed, 55 insertions(+), 11 deletions(-) diff --git a/backend/command-service/src/main/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfig.java b/backend/command-service/src/main/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfig.java index 2de5eb4..8d63122 100644 --- a/backend/command-service/src/main/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfig.java +++ b/backend/command-service/src/main/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfig.java @@ -1,8 +1,10 @@ package lt.satsyuk.distributed.audit.command.config; import lt.satsyuk.distributed.audit.event.AuditEvent; +import org.springframework.boot.context.properties.bind.Bindable; +import org.springframework.boot.context.properties.bind.Binder; import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.core.env.ConfigurableEnvironment; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -10,10 +12,9 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; -import org.springframework.kafka.support.serializer.JacksonJsonSerializer; -import java.util.HashMap; import java.util.Map; +import java.util.HashMap; /** * Explicit producer bean for {@code KafkaTemplate}. @@ -24,18 +25,15 @@ @Configuration public class KafkaProducerConfig { - private static final String JSON_ADD_TYPE_HEADERS_CONFIG = "spring.json.add.type.headers"; - @Bean @Primary public ProducerFactory auditEventProducerFactory( - @Value("${spring.kafka.bootstrap-servers:localhost:9092}") String bootstrapServers + @Value("${spring.kafka.bootstrap-servers}") String bootstrapServers, + ConfigurableEnvironment environment ) { Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonJsonSerializer.class); - props.put(JSON_ADD_TYPE_HEADERS_CONFIG, false); + mergeKafkaOverrides(props, environment); return new DefaultKafkaProducerFactory<>(props); } @@ -46,5 +44,22 @@ public KafkaTemplate auditEventKafkaTemplate( ) { return new KafkaTemplate<>(auditEventProducerFactory); } + + private static void mergeKafkaOverrides(Map props, + ConfigurableEnvironment environment) { + Binder binder = Binder.get(environment); + + binder.bind("spring.kafka.properties", Bindable.mapOf(String.class, String.class)) + .ifBound(props::putAll); + + binder.bind("spring.kafka.producer", Bindable.mapOf(String.class, String.class)) + .ifBound(m -> m.forEach((key, value) -> { + if (key.startsWith("properties.")) { + props.put(key.substring("properties.".length()), value); + } else { + props.put(key.replace('-', '.'), value); + } + })); + } } diff --git a/backend/command-service/src/test/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfigIntegrationTest.java b/backend/command-service/src/test/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfigIntegrationTest.java index ef7b6b5..1d159bb 100644 --- a/backend/command-service/src/test/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfigIntegrationTest.java +++ b/backend/command-service/src/test/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfigIntegrationTest.java @@ -1,14 +1,19 @@ package lt.satsyuk.distributed.audit.command.config; -import lt.satsyuk.distributed.audit.event.AuditEvent; import lt.satsyuk.distributed.audit.command.service.AuditCommandPublisher; +import lt.satsyuk.distributed.audit.event.AuditEvent; +import org.apache.kafka.clients.producer.ProducerConfig; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.context.TestPropertySource; +import java.util.Map; + import static org.assertj.core.api.Assertions.assertThat; /** @@ -20,16 +25,24 @@ @EmbeddedKafka(partitions = 1, topics = {"user.login.events"}) @TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", - "spring.kafka.producer.bootstrap-servers=${spring.embedded.kafka.brokers}" + "spring.kafka.properties.client.id=command-service-it", + "spring.kafka.producer.properties.acks=all" }) class KafkaProducerConfigIntegrationTest { @Autowired private KafkaTemplate auditEventKafkaTemplate; + @Autowired + private DefaultKafkaProducerFactory auditEventProducerFactory; + @Autowired private AuditCommandPublisher auditCommandPublisher; + @Autowired + @Value("${spring.embedded.kafka.brokers}") + private String embeddedKafkaBrokers; + @Test void auditEventKafkaTemplateBeanIsCreated() { assertThat(auditEventKafkaTemplate).isNotNull(); @@ -39,5 +52,21 @@ void auditEventKafkaTemplateBeanIsCreated() { void auditCommandPublisherBeanIsCreated() { assertThat(auditCommandPublisher).isNotNull(); } + + @Test + void producerFactoryPicksUpSpringKafkaOverrides() { + Map configurationProperties = + auditEventProducerFactory.getConfigurationProperties(); + + assertThat(configurationProperties) + .containsEntry(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaBrokers) + .containsEntry(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringSerializer") + .containsEntry(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + "org.springframework.kafka.support.serializer.JsonSerializer") + .containsEntry("spring.json.add.type.headers", "false") + .containsEntry("client.id", "command-service-it") + .containsEntry("acks", "all"); + } } From a60307e29e92c0554a63999e1115a946f5fa070e Mon Sep 17 00:00:00 2001 From: Igor Date: Sun, 24 May 2026 20:10:38 +0300 Subject: [PATCH 5/9] [#154] Remove deprecated JsonSerializer references --- .../command/config/KafkaProducerConfig.java | 8 ++++++ .../KafkaProducerConfigIntegrationTest.java | 27 ++++++++++++++----- 2 files changed, 28 insertions(+), 7 deletions(-) diff --git a/backend/command-service/src/main/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfig.java b/backend/command-service/src/main/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfig.java index 8d63122..307b5de 100644 --- a/backend/command-service/src/main/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfig.java +++ b/backend/command-service/src/main/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfig.java @@ -12,6 +12,7 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; +import org.apache.kafka.common.serialization.StringSerializer; import java.util.Map; import java.util.HashMap; @@ -25,6 +26,10 @@ @Configuration public class KafkaProducerConfig { + private static final String JSON_SERIALIZER_FQCN = + "org.springframework.kafka.support.serializer.JsonSerializer"; + private static final String JSON_ADD_TYPE_HEADERS_CONFIG = "spring.json.add.type.headers"; + @Bean @Primary public ProducerFactory auditEventProducerFactory( @@ -33,6 +38,9 @@ public ProducerFactory auditEventProducerFactory( ) { Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JSON_SERIALIZER_FQCN); + props.put(JSON_ADD_TYPE_HEADERS_CONFIG, false); mergeKafkaOverrides(props, environment); return new DefaultKafkaProducerFactory<>(props); } diff --git a/backend/command-service/src/test/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfigIntegrationTest.java b/backend/command-service/src/test/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfigIntegrationTest.java index 1d159bb..4506113 100644 --- a/backend/command-service/src/test/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfigIntegrationTest.java +++ b/backend/command-service/src/test/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfigIntegrationTest.java @@ -3,12 +3,14 @@ import lt.satsyuk.distributed.audit.command.service.AuditCommandPublisher; import lt.satsyuk.distributed.audit.event.AuditEvent; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.context.TestPropertySource; @@ -30,11 +32,14 @@ }) class KafkaProducerConfigIntegrationTest { + private static final String JSON_SERIALIZER_FQCN = + "org.springframework.kafka.support.serializer.JsonSerializer"; + @Autowired private KafkaTemplate auditEventKafkaTemplate; @Autowired - private DefaultKafkaProducerFactory auditEventProducerFactory; + private ProducerFactory auditEventProducerFactory; @Autowired private AuditCommandPublisher auditCommandPublisher; @@ -55,18 +60,26 @@ void auditCommandPublisherBeanIsCreated() { @Test void producerFactoryPicksUpSpringKafkaOverrides() { + assertThat(auditEventProducerFactory).isInstanceOf(DefaultKafkaProducerFactory.class); + + @SuppressWarnings("unchecked") + DefaultKafkaProducerFactory defaultFactory = + (DefaultKafkaProducerFactory) auditEventProducerFactory; + Map configurationProperties = - auditEventProducerFactory.getConfigurationProperties(); + defaultFactory.getConfigurationProperties(); assertThat(configurationProperties) .containsEntry(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaBrokers) - .containsEntry(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, - "org.apache.kafka.common.serialization.StringSerializer") - .containsEntry(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, - "org.springframework.kafka.support.serializer.JsonSerializer") - .containsEntry("spring.json.add.type.headers", "false") .containsEntry("client.id", "command-service-it") .containsEntry("acks", "all"); + + assertThat(configurationProperties.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) + .isIn(StringSerializer.class, StringSerializer.class.getName()); + assertThat(configurationProperties.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) + .isIn(JSON_SERIALIZER_FQCN); + assertThat(configurationProperties.get("spring.json.add.type.headers")) + .isIn(false, "false"); } } From e6eb8ec0752e9cd416f5248ec3eb411e29b9c260 Mon Sep 17 00:00:00 2001 From: Igor Date: Sun, 24 May 2026 20:24:58 +0300 Subject: [PATCH 6/9] [#154] Address remaining review feedback --- .../audit/command/config/KafkaProducerConfig.java | 8 ++++---- .../config/KafkaProducerConfigIntegrationTest.java | 11 ++++++++--- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/backend/command-service/src/main/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfig.java b/backend/command-service/src/main/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfig.java index 307b5de..7fa5689 100644 --- a/backend/command-service/src/main/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfig.java +++ b/backend/command-service/src/main/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfig.java @@ -37,11 +37,11 @@ public ProducerFactory auditEventProducerFactory( ConfigurableEnvironment environment ) { Map props = new HashMap<>(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JSON_SERIALIZER_FQCN); - props.put(JSON_ADD_TYPE_HEADERS_CONFIG, false); mergeKafkaOverrides(props, environment); + props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JSON_SERIALIZER_FQCN); + props.putIfAbsent(JSON_ADD_TYPE_HEADERS_CONFIG, false); return new DefaultKafkaProducerFactory<>(props); } diff --git a/backend/command-service/src/test/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfigIntegrationTest.java b/backend/command-service/src/test/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfigIntegrationTest.java index 4506113..856a3c1 100644 --- a/backend/command-service/src/test/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfigIntegrationTest.java +++ b/backend/command-service/src/test/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfigIntegrationTest.java @@ -44,7 +44,6 @@ class KafkaProducerConfigIntegrationTest { @Autowired private AuditCommandPublisher auditCommandPublisher; - @Autowired @Value("${spring.embedded.kafka.brokers}") private String embeddedKafkaBrokers; @@ -76,8 +75,14 @@ void producerFactoryPicksUpSpringKafkaOverrides() { assertThat(configurationProperties.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) .isIn(StringSerializer.class, StringSerializer.class.getName()); - assertThat(configurationProperties.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) - .isIn(JSON_SERIALIZER_FQCN); + Object valueSerializer = configurationProperties.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG); + assertThat(valueSerializer).satisfiesAnyOf( + serializer -> assertThat(serializer).isEqualTo(JSON_SERIALIZER_FQCN), + serializer -> assertThat(serializer) + .isInstanceOf(Class.class) + .extracting(candidate -> ((Class) candidate).getName()) + .isEqualTo(JSON_SERIALIZER_FQCN) + ); assertThat(configurationProperties.get("spring.json.add.type.headers")) .isIn(false, "false"); } From ed5af5939e1a4442208c301b1d6a52ec8aee3b7f Mon Sep 17 00:00:00 2001 From: Igor Date: Sun, 24 May 2026 20:35:17 +0300 Subject: [PATCH 7/9] [#154] Address review: use JsonSerializer.class literal and @CommandServiceIntegrationTest meta-annotation Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../audit/command/config/KafkaProducerConfig.java | 5 ++--- .../command/config/KafkaProducerConfigIntegrationTest.java | 6 ++---- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/backend/command-service/src/main/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfig.java b/backend/command-service/src/main/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfig.java index 7fa5689..f909bda 100644 --- a/backend/command-service/src/main/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfig.java +++ b/backend/command-service/src/main/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfig.java @@ -13,6 +13,7 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.kafka.support.serializer.JsonSerializer; import java.util.Map; import java.util.HashMap; @@ -26,8 +27,6 @@ @Configuration public class KafkaProducerConfig { - private static final String JSON_SERIALIZER_FQCN = - "org.springframework.kafka.support.serializer.JsonSerializer"; private static final String JSON_ADD_TYPE_HEADERS_CONFIG = "spring.json.add.type.headers"; @Bean @@ -40,7 +39,7 @@ public ProducerFactory auditEventProducerFactory( mergeKafkaOverrides(props, environment); props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JSON_SERIALIZER_FQCN); + props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); props.putIfAbsent(JSON_ADD_TYPE_HEADERS_CONFIG, false); return new DefaultKafkaProducerFactory<>(props); } diff --git a/backend/command-service/src/test/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfigIntegrationTest.java b/backend/command-service/src/test/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfigIntegrationTest.java index 856a3c1..3dae6f6 100644 --- a/backend/command-service/src/test/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfigIntegrationTest.java +++ b/backend/command-service/src/test/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfigIntegrationTest.java @@ -1,5 +1,6 @@ package lt.satsyuk.distributed.audit.command.config; +import lt.satsyuk.distributed.audit.command.CommandServiceIntegrationTest; import lt.satsyuk.distributed.audit.command.service.AuditCommandPublisher; import lt.satsyuk.distributed.audit.event.AuditEvent; import org.apache.kafka.clients.producer.ProducerConfig; @@ -7,11 +8,9 @@ import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.test.context.SpringBootTest; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; -import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.context.TestPropertySource; import java.util.Map; @@ -23,8 +22,7 @@ * {@code KafkaTemplate} bean and that {@link AuditCommandPublisher} * is wired successfully from the real application context. */ -@SpringBootTest -@EmbeddedKafka(partitions = 1, topics = {"user.login.events"}) +@CommandServiceIntegrationTest @TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", "spring.kafka.properties.client.id=command-service-it", From 2f792e90b342c7594892b56124e00976edf12454 Mon Sep 17 00:00:00 2001 From: Igor Date: Sun, 24 May 2026 20:42:01 +0300 Subject: [PATCH 8/9] [#154] Address review: always apply bootstrap.servers after merging overrides Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../distributed/audit/command/config/KafkaProducerConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/command-service/src/main/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfig.java b/backend/command-service/src/main/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfig.java index f909bda..313d883 100644 --- a/backend/command-service/src/main/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfig.java +++ b/backend/command-service/src/main/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfig.java @@ -37,7 +37,7 @@ public ProducerFactory auditEventProducerFactory( ) { Map props = new HashMap<>(); mergeKafkaOverrides(props, environment); - props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); props.putIfAbsent(JSON_ADD_TYPE_HEADERS_CONFIG, false); From 9f6e72a6979745140525420b35aa5a54576fa690 Mon Sep 17 00:00:00 2001 From: Igor Date: Sun, 24 May 2026 20:51:06 +0300 Subject: [PATCH 9/9] [#154] Address review: use putIfAbsent for bootstrap.servers to allow producer-specific overrides Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../distributed/audit/command/config/KafkaProducerConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/command-service/src/main/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfig.java b/backend/command-service/src/main/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfig.java index 313d883..f909bda 100644 --- a/backend/command-service/src/main/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfig.java +++ b/backend/command-service/src/main/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfig.java @@ -37,7 +37,7 @@ public ProducerFactory auditEventProducerFactory( ) { Map props = new HashMap<>(); mergeKafkaOverrides(props, environment); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); props.putIfAbsent(JSON_ADD_TYPE_HEADERS_CONFIG, false);