Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package lt.satsyuk.distributed.audit.command.config;

import lt.satsyuk.distributed.audit.event.AuditEvent;
import org.springframework.boot.context.properties.bind.Bindable;
import org.springframework.boot.context.properties.bind.Binder;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.Map;
import java.util.HashMap;

/**
* Explicit producer bean for {@code KafkaTemplate<String, AuditEvent>}.
*
* <p>Spring Boot 4 may otherwise expose only a raw {@code KafkaTemplate<?, ?>}
* bean, which does not satisfy the strongly-typed constructor dependency.
*/
@Configuration
public class KafkaProducerConfig {

private static final String JSON_ADD_TYPE_HEADERS_CONFIG = "spring.json.add.type.headers";

@Bean
@Primary
public ProducerFactory<String, AuditEvent> auditEventProducerFactory(
@Value("${spring.kafka.bootstrap-servers}") String bootstrapServers,
ConfigurableEnvironment environment
) {
Comment thread
igorsatsyuk marked this conversation as resolved.
Map<String, Object> props = new HashMap<>();
mergeKafkaOverrides(props, environment);
props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.putIfAbsent(JSON_ADD_TYPE_HEADERS_CONFIG, false);
Comment thread
igorsatsyuk marked this conversation as resolved.
return new DefaultKafkaProducerFactory<>(props);
Comment thread
igorsatsyuk marked this conversation as resolved.
Comment thread
igorsatsyuk marked this conversation as resolved.
Comment thread
igorsatsyuk marked this conversation as resolved.
}
Comment thread
igorsatsyuk marked this conversation as resolved.

@Bean
@Primary
public KafkaTemplate<String, AuditEvent> auditEventKafkaTemplate(
ProducerFactory<String, AuditEvent> auditEventProducerFactory
) {
Comment thread
igorsatsyuk marked this conversation as resolved.
return new KafkaTemplate<>(auditEventProducerFactory);
Comment thread
igorsatsyuk marked this conversation as resolved.
}

private static void mergeKafkaOverrides(Map<String, Object> props,
ConfigurableEnvironment environment) {
Binder binder = Binder.get(environment);

binder.bind("spring.kafka.properties", Bindable.mapOf(String.class, String.class))
.ifBound(props::putAll);

binder.bind("spring.kafka.producer", Bindable.mapOf(String.class, String.class))
.ifBound(m -> m.forEach((key, value) -> {
if (key.startsWith("properties.")) {
props.put(key.substring("properties.".length()), value);
} else {
props.put(key.replace('-', '.'), value);
}
}));
Comment thread
igorsatsyuk marked this conversation as resolved.
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package lt.satsyuk.distributed.audit.command.config;

import lt.satsyuk.distributed.audit.command.CommandServiceIntegrationTest;
import lt.satsyuk.distributed.audit.command.service.AuditCommandPublisher;
import lt.satsyuk.distributed.audit.event.AuditEvent;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.test.context.TestPropertySource;
Comment thread
igorsatsyuk marked this conversation as resolved.

import java.util.Map;

import static org.assertj.core.api.Assertions.assertThat;

/**
* Verifies that {@link KafkaProducerConfig} correctly registers a strongly-typed
* {@code KafkaTemplate<String, AuditEvent>} bean and that {@link AuditCommandPublisher}
* is wired successfully from the real application context.
*/
@CommandServiceIntegrationTest
@TestPropertySource(properties = {
"spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
"spring.kafka.properties.client.id=command-service-it",
"spring.kafka.producer.properties.acks=all"
})
class KafkaProducerConfigIntegrationTest {

private static final String JSON_SERIALIZER_FQCN =
"org.springframework.kafka.support.serializer.JsonSerializer";

@Autowired
private KafkaTemplate<String, AuditEvent> auditEventKafkaTemplate;

@Autowired
private ProducerFactory<String, AuditEvent> auditEventProducerFactory;

@Autowired
private AuditCommandPublisher auditCommandPublisher;

@Value("${spring.embedded.kafka.brokers}")
private String embeddedKafkaBrokers;

@Test
void auditEventKafkaTemplateBeanIsCreated() {
assertThat(auditEventKafkaTemplate).isNotNull();
}

@Test
void auditCommandPublisherBeanIsCreated() {
assertThat(auditCommandPublisher).isNotNull();
}

@Test
void producerFactoryPicksUpSpringKafkaOverrides() {
assertThat(auditEventProducerFactory).isInstanceOf(DefaultKafkaProducerFactory.class);

@SuppressWarnings("unchecked")
DefaultKafkaProducerFactory<String, AuditEvent> defaultFactory =
(DefaultKafkaProducerFactory<String, AuditEvent>) auditEventProducerFactory;

Map<String, Object> configurationProperties =
defaultFactory.getConfigurationProperties();

assertThat(configurationProperties)
.containsEntry(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaBrokers)
.containsEntry("client.id", "command-service-it")
.containsEntry("acks", "all");

assertThat(configurationProperties.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG))
.isIn(StringSerializer.class, StringSerializer.class.getName());
Object valueSerializer = configurationProperties.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
assertThat(valueSerializer).satisfiesAnyOf(
serializer -> assertThat(serializer).isEqualTo(JSON_SERIALIZER_FQCN),
serializer -> assertThat(serializer)
.isInstanceOf(Class.class)
.extracting(candidate -> ((Class<?>) candidate).getName())
.isEqualTo(JSON_SERIALIZER_FQCN)
);
assertThat(configurationProperties.get("spring.json.add.type.headers"))
.isIn(false, "false");
}
}

Loading