-
Notifications
You must be signed in to change notification settings - Fork 0
Fix command-service KafkaTemplate bean wiring #154
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
33ca016
Fix command-service KafkaTemplate wiring
4699214
Address review feedback for KafkaProducerConfig
cd391f5
[#154] Add SpringBootTest for KafkaProducerConfig bean wiring
bbb4640
[#154] Align Kafka producer wiring with Spring config
a60307e
[#154] Remove deprecated JsonSerializer references
e6eb8ec
[#154] Address remaining review feedback
ed5af59
[#154] Address review: use JsonSerializer.class literal and @CommandS…
2f792e9
[#154] Address review: always apply bootstrap.servers after merging o…
9f6e72a
[#154] Address review: use putIfAbsent for bootstrap.servers to allow…
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
72 changes: 72 additions & 0 deletions
72
...ervice/src/main/java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfig.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,72 @@ | ||
| package lt.satsyuk.distributed.audit.command.config; | ||
|
|
||
| import lt.satsyuk.distributed.audit.event.AuditEvent; | ||
| import org.springframework.boot.context.properties.bind.Bindable; | ||
| import org.springframework.boot.context.properties.bind.Binder; | ||
| import org.apache.kafka.clients.producer.ProducerConfig; | ||
| import org.springframework.core.env.ConfigurableEnvironment; | ||
| import org.springframework.beans.factory.annotation.Value; | ||
| import org.springframework.context.annotation.Bean; | ||
| import org.springframework.context.annotation.Configuration; | ||
| import org.springframework.context.annotation.Primary; | ||
| import org.springframework.kafka.core.DefaultKafkaProducerFactory; | ||
| import org.springframework.kafka.core.KafkaTemplate; | ||
| import org.springframework.kafka.core.ProducerFactory; | ||
| import org.apache.kafka.common.serialization.StringSerializer; | ||
| import org.springframework.kafka.support.serializer.JsonSerializer; | ||
|
|
||
| import java.util.Map; | ||
| import java.util.HashMap; | ||
|
|
||
| /** | ||
| * Explicit producer bean for {@code KafkaTemplate<String, AuditEvent>}. | ||
| * | ||
| * <p>Spring Boot 4 may otherwise expose only a raw {@code KafkaTemplate<?, ?>} | ||
| * bean, which does not satisfy the strongly-typed constructor dependency. | ||
| */ | ||
| @Configuration | ||
| public class KafkaProducerConfig { | ||
|
|
||
| private static final String JSON_ADD_TYPE_HEADERS_CONFIG = "spring.json.add.type.headers"; | ||
|
|
||
| @Bean | ||
| @Primary | ||
| public ProducerFactory<String, AuditEvent> auditEventProducerFactory( | ||
| @Value("${spring.kafka.bootstrap-servers}") String bootstrapServers, | ||
| ConfigurableEnvironment environment | ||
| ) { | ||
| Map<String, Object> props = new HashMap<>(); | ||
| mergeKafkaOverrides(props, environment); | ||
| props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); | ||
| props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); | ||
| props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); | ||
| props.putIfAbsent(JSON_ADD_TYPE_HEADERS_CONFIG, false); | ||
|
igorsatsyuk marked this conversation as resolved.
|
||
| return new DefaultKafkaProducerFactory<>(props); | ||
|
igorsatsyuk marked this conversation as resolved.
igorsatsyuk marked this conversation as resolved.
igorsatsyuk marked this conversation as resolved.
|
||
| } | ||
|
igorsatsyuk marked this conversation as resolved.
|
||
|
|
||
| @Bean | ||
| @Primary | ||
| public KafkaTemplate<String, AuditEvent> auditEventKafkaTemplate( | ||
| ProducerFactory<String, AuditEvent> auditEventProducerFactory | ||
| ) { | ||
|
igorsatsyuk marked this conversation as resolved.
|
||
| return new KafkaTemplate<>(auditEventProducerFactory); | ||
|
igorsatsyuk marked this conversation as resolved.
|
||
| } | ||
|
|
||
| private static void mergeKafkaOverrides(Map<String, Object> props, | ||
| ConfigurableEnvironment environment) { | ||
| Binder binder = Binder.get(environment); | ||
|
|
||
| binder.bind("spring.kafka.properties", Bindable.mapOf(String.class, String.class)) | ||
| .ifBound(props::putAll); | ||
|
|
||
| binder.bind("spring.kafka.producer", Bindable.mapOf(String.class, String.class)) | ||
| .ifBound(m -> m.forEach((key, value) -> { | ||
| if (key.startsWith("properties.")) { | ||
| props.put(key.substring("properties.".length()), value); | ||
| } else { | ||
| props.put(key.replace('-', '.'), value); | ||
| } | ||
| })); | ||
|
igorsatsyuk marked this conversation as resolved.
|
||
| } | ||
| } | ||
|
|
||
88 changes: 88 additions & 0 deletions
88
.../java/lt/satsyuk/distributed/audit/command/config/KafkaProducerConfigIntegrationTest.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,88 @@ | ||
| package lt.satsyuk.distributed.audit.command.config; | ||
|
|
||
| import lt.satsyuk.distributed.audit.command.CommandServiceIntegrationTest; | ||
| import lt.satsyuk.distributed.audit.command.service.AuditCommandPublisher; | ||
| import lt.satsyuk.distributed.audit.event.AuditEvent; | ||
| import org.apache.kafka.clients.producer.ProducerConfig; | ||
| import org.apache.kafka.common.serialization.StringSerializer; | ||
| import org.junit.jupiter.api.Test; | ||
| import org.springframework.beans.factory.annotation.Autowired; | ||
| import org.springframework.beans.factory.annotation.Value; | ||
| import org.springframework.kafka.core.DefaultKafkaProducerFactory; | ||
| import org.springframework.kafka.core.KafkaTemplate; | ||
| import org.springframework.kafka.core.ProducerFactory; | ||
| import org.springframework.test.context.TestPropertySource; | ||
|
igorsatsyuk marked this conversation as resolved.
|
||
|
|
||
| import java.util.Map; | ||
|
|
||
| import static org.assertj.core.api.Assertions.assertThat; | ||
|
|
||
| /** | ||
| * Verifies that {@link KafkaProducerConfig} correctly registers a strongly-typed | ||
| * {@code KafkaTemplate<String, AuditEvent>} bean and that {@link AuditCommandPublisher} | ||
| * is wired successfully from the real application context. | ||
| */ | ||
| @CommandServiceIntegrationTest | ||
| @TestPropertySource(properties = { | ||
| "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", | ||
| "spring.kafka.properties.client.id=command-service-it", | ||
| "spring.kafka.producer.properties.acks=all" | ||
| }) | ||
| class KafkaProducerConfigIntegrationTest { | ||
|
|
||
| private static final String JSON_SERIALIZER_FQCN = | ||
| "org.springframework.kafka.support.serializer.JsonSerializer"; | ||
|
|
||
| @Autowired | ||
| private KafkaTemplate<String, AuditEvent> auditEventKafkaTemplate; | ||
|
|
||
| @Autowired | ||
| private ProducerFactory<String, AuditEvent> auditEventProducerFactory; | ||
|
|
||
| @Autowired | ||
| private AuditCommandPublisher auditCommandPublisher; | ||
|
|
||
| @Value("${spring.embedded.kafka.brokers}") | ||
| private String embeddedKafkaBrokers; | ||
|
|
||
| @Test | ||
| void auditEventKafkaTemplateBeanIsCreated() { | ||
| assertThat(auditEventKafkaTemplate).isNotNull(); | ||
| } | ||
|
|
||
| @Test | ||
| void auditCommandPublisherBeanIsCreated() { | ||
| assertThat(auditCommandPublisher).isNotNull(); | ||
| } | ||
|
|
||
| @Test | ||
| void producerFactoryPicksUpSpringKafkaOverrides() { | ||
| assertThat(auditEventProducerFactory).isInstanceOf(DefaultKafkaProducerFactory.class); | ||
|
|
||
| @SuppressWarnings("unchecked") | ||
| DefaultKafkaProducerFactory<String, AuditEvent> defaultFactory = | ||
| (DefaultKafkaProducerFactory<String, AuditEvent>) auditEventProducerFactory; | ||
|
|
||
| Map<String, Object> configurationProperties = | ||
| defaultFactory.getConfigurationProperties(); | ||
|
|
||
| assertThat(configurationProperties) | ||
| .containsEntry(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaBrokers) | ||
| .containsEntry("client.id", "command-service-it") | ||
| .containsEntry("acks", "all"); | ||
|
|
||
| assertThat(configurationProperties.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) | ||
| .isIn(StringSerializer.class, StringSerializer.class.getName()); | ||
| Object valueSerializer = configurationProperties.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG); | ||
| assertThat(valueSerializer).satisfiesAnyOf( | ||
| serializer -> assertThat(serializer).isEqualTo(JSON_SERIALIZER_FQCN), | ||
| serializer -> assertThat(serializer) | ||
| .isInstanceOf(Class.class) | ||
| .extracting(candidate -> ((Class<?>) candidate).getName()) | ||
| .isEqualTo(JSON_SERIALIZER_FQCN) | ||
| ); | ||
| assertThat(configurationProperties.get("spring.json.add.type.headers")) | ||
| .isIn(false, "false"); | ||
| } | ||
| } | ||
|
|
||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.