Skip to content

Commit a683629

Browse files
committed
chore(demo-kafka): fix org.apache.kafka.common.errors.TimeoutException
1 parent 4d32a4e commit a683629

4 files changed

Lines changed: 27 additions & 18 deletions

File tree

demo-kafka/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@
4949
<artifactId>spring-boot-starter-test</artifactId>
5050
<scope>test</scope>
5151
</dependency>
52+
<dependency>
53+
<groupId>org.springframework.boot</groupId>
54+
<artifactId>spring-boot-testcontainers</artifactId>
55+
<scope>test</scope>
56+
</dependency>
5257
<dependency>
5358
<groupId>org.springframework.kafka</groupId>
5459
<artifactId>spring-kafka-test</artifactId>

demo-kafka/src/main/resources/application.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ spring:
44
consumer:
55
# set consumer group id (blank for auto)
66
group-id: ${KAFKA_CONSUMER_GROUP_ID:}
7-
auto-offset-reset: ${KAFKA_CONSUMER_AUTO_OFFSET_RESET:latest}
7+
auto-offset-reset: ${KAFKA_CONSUMER_AUTO_OFFSET_RESET:earliest}
88
max-poll-records: ${KAFKA_CONSUMER_MAX_POLL_RECORDS:500}
99
max-partition-fetch-bytes: ${KAFKA_MAX_PARTITION_FETCH_BYTES:1000000}
1010

demo-kafka/src/test/java/com/helltractor/demo/container/ConfluentKafkaContainerCluster.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package com.helltractor.demo.container;
22

33
import org.apache.kafka.common.Uuid;
4-
import org.apache.kafka.clients.NetworkClient;
54
import org.awaitility.Awaitility;
65
import org.testcontainers.containers.Container;
76
import org.testcontainers.containers.GenericContainer;

demo-kafka/src/test/java/com/helltractor/demo/messaging/MessagingFactoryTest.java

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package com.helltractor.demo.messaging;
22

33
import com.helltractor.demo.KafkaApplication;
4-
import com.helltractor.demo.container.ConfluentKafkaContainerCluster;
4+
import com.helltractor.demo.container.KafkaContainerCluster;
55
import com.helltractor.demo.message.TestMessage;
66
import com.helltractor.demo.util.IpUtil;
77
import org.awaitility.Awaitility;
@@ -12,8 +12,11 @@
1212
import org.slf4j.LoggerFactory;
1313
import org.springframework.beans.factory.annotation.Autowired;
1414
import org.springframework.boot.test.context.SpringBootTest;
15+
import org.springframework.test.context.DynamicPropertyRegistry;
16+
import org.springframework.test.context.DynamicPropertySource;
1517
import org.testcontainers.junit.jupiter.Container;
1618
import org.testcontainers.junit.jupiter.Testcontainers;
19+
import org.testcontainers.kafka.KafkaContainer;
1720

1821
import java.time.Duration;
1922
import java.util.List;
@@ -27,8 +30,11 @@ public final class MessagingFactoryTest {
2730

2831
final static Logger logger = LoggerFactory.getLogger(MessagingFactoryTest.class);
2932

33+
// @Container
34+
// static KafkaContainer kafka = new KafkaContainer("apache/kafka:3.8.0");
35+
3036
@Container
31-
static ConfluentKafkaContainerCluster kafkaCluster = new ConfluentKafkaContainerCluster("7.4.0", 1, 1);
37+
static KafkaContainerCluster kafkaCluster = new KafkaContainerCluster("6.2.1", 1, 1);
3238

3339
static class TestConsumer {
3440

@@ -48,32 +54,31 @@ int getTotalMessages() {
4854
}
4955
}
5056

57+
@DynamicPropertySource
58+
static void dynamicProperties(DynamicPropertyRegistry registry) {
59+
registry.add("spring.kafka.bootstrap-servers", kafkaCluster::getBootstrapServers);
60+
}
61+
5162
@Autowired
5263
MessagingFactory messagingFactory;
5364

54-
MessageProducer<TestMessage> processorOne;
55-
MessageProducer<TestMessage> processorTwo;
56-
MessageProducer<TestMessage> processorThree;
57-
5865
@BeforeEach
5966
void init() {
67+
// kafka.start();
6068
kafkaCluster.start();
61-
Awaitility.await()
62-
.atMost(Duration.ofMinutes(1))
63-
.pollInterval(Duration.ofSeconds(5))
64-
.until(() -> kafkaCluster.getBrokers().stream().allMatch(b -> b.isRunning()));
65-
processorOne = messagingFactory.createMessageProducer(Messaging.Topic.TOPIC_ONE, TestMessage.class);
66-
processorTwo = messagingFactory.createMessageProducer(Messaging.Topic.TOPIC_TWO, TestMessage.class);
67-
processorThree = messagingFactory.createMessageProducer(Messaging.Topic.TOPIC_THREE, TestMessage.class);
6869
}
69-
70+
7071
@AfterEach
7172
void destroy() {
73+
// kafka.stop();
7274
kafkaCluster.stop();
7375
}
7476

7577
@Test
76-
void test() throws InterruptedException {
78+
void test() {
79+
MessageProducer<TestMessage> processorOne = messagingFactory.createMessageProducer(Messaging.Topic.TOPIC_ONE, TestMessage.class);
80+
MessageProducer<TestMessage> processorTwo = messagingFactory.createMessageProducer(Messaging.Topic.TOPIC_TWO, TestMessage.class);
81+
MessageProducer<TestMessage> processorThree = messagingFactory.createMessageProducer(Messaging.Topic.TOPIC_THREE, TestMessage.class);
7782
for (int i = 0; i < 100; i++) {
7883
TestMessage testMessage = new TestMessage();
7984
testMessage.message = "Test-" + i;
@@ -90,7 +95,7 @@ void test() throws InterruptedException {
9095

9196
Awaitility.await()
9297
.atMost(Duration.ofSeconds(20))
93-
.until(() -> testConsumer.getTotalMessages() >= 300);
98+
.until(() -> testConsumer.getTotalMessages() == 300);
9499

95100
assertEquals(300, testConsumer.getTotalMessages());
96101
}

0 commit comments

Comments
 (0)