diff --git a/backend/command-service/src/main/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfig.java b/backend/command-service/src/main/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfig.java new file mode 100644 index 0000000..f909bda --- /dev/null +++ b/backend/command-service/src/main/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfig.java @@ -0,0 +1,72 @@ +package lt.satsyuk.distributed.audit.command.config; + +import lt.satsyuk.distributed.audit.event.AuditEvent; +import org.springframework.boot.context.properties.bind.Bindable; +import org.springframework.boot.context.properties.bind.Binder; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.springframework.core.env.ConfigurableEnvironment; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.kafka.support.serializer.JsonSerializer; + +import java.util.Map; +import java.util.HashMap; + +/** + * Explicit producer bean for {@code KafkaTemplate}. + * + *

Spring Boot 4 may otherwise expose only a raw {@code KafkaTemplate} + * bean, which does not satisfy the strongly-typed constructor dependency. + */ +@Configuration +public class KafkaProducerConfig { + + private static final String JSON_ADD_TYPE_HEADERS_CONFIG = "spring.json.add.type.headers"; + + @Bean + @Primary + public ProducerFactory auditEventProducerFactory( + @Value("${spring.kafka.bootstrap-servers}") String bootstrapServers, + ConfigurableEnvironment environment + ) { + Map props = new HashMap<>(); + mergeKafkaOverrides(props, environment); + props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + props.putIfAbsent(JSON_ADD_TYPE_HEADERS_CONFIG, false); + return new DefaultKafkaProducerFactory<>(props); + } + + @Bean + @Primary + public KafkaTemplate auditEventKafkaTemplate( + ProducerFactory auditEventProducerFactory + ) { + return new KafkaTemplate<>(auditEventProducerFactory); + } + + private static void mergeKafkaOverrides(Map props, + ConfigurableEnvironment environment) { + Binder binder = Binder.get(environment); + + binder.bind("spring.kafka.properties", Bindable.mapOf(String.class, String.class)) + .ifBound(props::putAll); + + binder.bind("spring.kafka.producer", Bindable.mapOf(String.class, String.class)) + .ifBound(m -> m.forEach((key, value) -> { + if (key.startsWith("properties.")) { + props.put(key.substring("properties.".length()), value); + } else { + props.put(key.replace('-', '.'), value); + } + })); + } +} + diff --git a/backend/command-service/src/test/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfigIntegrationTest.java b/backend/command-service/src/test/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfigIntegrationTest.java new file mode 100644 index 0000000..3dae6f6 --- /dev/null +++ b/backend/command-service/src/test/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfigIntegrationTest.java @@ -0,0 +1,88 @@ +package lt.satsyuk.distributed.audit.command.config; + +import lt.satsyuk.distributed.audit.command.CommandServiceIntegrationTest; +import lt.satsyuk.distributed.audit.command.service.AuditCommandPublisher; +import lt.satsyuk.distributed.audit.event.AuditEvent; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.test.context.TestPropertySource; + +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Verifies that {@link KafkaProducerConfig} correctly registers a strongly-typed + * {@code KafkaTemplate} bean and that {@link AuditCommandPublisher} + * is wired successfully from the real application context. + */ +@CommandServiceIntegrationTest +@TestPropertySource(properties = { + "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", + "spring.kafka.properties.client.id=command-service-it", + "spring.kafka.producer.properties.acks=all" +}) +class KafkaProducerConfigIntegrationTest { + + private static final String JSON_SERIALIZER_FQCN = + "org.springframework.kafka.support.serializer.JsonSerializer"; + + @Autowired + private KafkaTemplate auditEventKafkaTemplate; + + @Autowired + private ProducerFactory auditEventProducerFactory; + + @Autowired + private AuditCommandPublisher auditCommandPublisher; + + @Value("${spring.embedded.kafka.brokers}") + private String embeddedKafkaBrokers; + + @Test + void auditEventKafkaTemplateBeanIsCreated() { + assertThat(auditEventKafkaTemplate).isNotNull(); + } + + @Test + void auditCommandPublisherBeanIsCreated() { + assertThat(auditCommandPublisher).isNotNull(); + } + + @Test + void producerFactoryPicksUpSpringKafkaOverrides() { + assertThat(auditEventProducerFactory).isInstanceOf(DefaultKafkaProducerFactory.class); + + @SuppressWarnings("unchecked") + DefaultKafkaProducerFactory defaultFactory = + (DefaultKafkaProducerFactory) auditEventProducerFactory; + + Map configurationProperties = + defaultFactory.getConfigurationProperties(); + + assertThat(configurationProperties) + .containsEntry(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaBrokers) + .containsEntry("client.id", "command-service-it") + .containsEntry("acks", "all"); + + assertThat(configurationProperties.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) + .isIn(StringSerializer.class, StringSerializer.class.getName()); + Object valueSerializer = configurationProperties.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG); + assertThat(valueSerializer).satisfiesAnyOf( + serializer -> assertThat(serializer).isEqualTo(JSON_SERIALIZER_FQCN), + serializer -> assertThat(serializer) + .isInstanceOf(Class.class) + .extracting(candidate -> ((Class) candidate).getName()) + .isEqualTo(JSON_SERIALIZER_FQCN) + ); + assertThat(configurationProperties.get("spring.json.add.type.headers")) + .isIn(false, "false"); + } +} +