Skip to content

Commit 73cb15e

Browse files
authored
KAFKA-16600: Add unit test for idempotent PENDING_SHUTDOWN state transition (#21671)
The change includes a unit test that reliably fails before the fix from KAFKA-17379 and reliably passes afterward. Reviewers: Matthias J. Sax <matthias@confluent.io>
1 parent 1b69c09 commit 73cb15e

2 files changed

Lines changed: 71 additions & 1 deletion

File tree

streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1585,7 +1585,7 @@ boolean close(final Optional<Long> timeout, final org.apache.kafka.streams.Close
15851585
}
15861586
}
15871587

1588-
private void closeToError() {
1588+
void closeToError() {
15891589
if (!setState(State.PENDING_ERROR)) {
15901590
log.info("Skipping shutdown since we are already in {}", state());
15911591
} else {

streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1914,6 +1914,76 @@ public void shouldNotCallCleanOnStartupByDefault() {
19141914
}
19151915
}
19161916

1917+
@Test
1918+
public void shouldHandleCloseAfterErrorState() throws Exception {
1919+
// Regression test for the race condition bug fixed by KAFKA-17379 that also fixed KAFKA-16600.
1920+
prepareStreams();
1921+
final AtomicReference<StreamThread.State> state1 = prepareStreamThread(streamThreadOne, 1);
1922+
final AtomicReference<StreamThread.State> state2 = prepareStreamThread(streamThreadTwo, 2);
1923+
prepareThreadState(streamThreadOne, state1);
1924+
prepareThreadState(streamThreadTwo, state2);
1925+
1926+
try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
1927+
streams.start();
1928+
waitForCondition(
1929+
() -> streams.state() == KafkaStreams.State.RUNNING,
1930+
"Streams never started"
1931+
);
1932+
1933+
final int numberOfConcurrentCloseThreads = 10;
1934+
final AtomicReference<Throwable> closeException = new AtomicReference<>();
1935+
final CountDownLatch startLatch = new CountDownLatch(1);
1936+
final CountDownLatch completionLatch = new CountDownLatch(numberOfConcurrentCloseThreads + 1);
1937+
1938+
// Launch multiple close() threads
1939+
for (int i = 0; i < numberOfConcurrentCloseThreads; i++) {
1940+
new Thread(
1941+
() -> {
1942+
try {
1943+
startLatch.await();
1944+
streams.close(Duration.ofSeconds(10));
1945+
} catch (final Throwable t) {
1946+
closeException.compareAndSet(null, t);
1947+
} finally {
1948+
completionLatch.countDown();
1949+
}
1950+
},
1951+
"CloseThread-" + i
1952+
).start();
1953+
}
1954+
1955+
// Launch error thread
1956+
new Thread(
1957+
() -> {
1958+
try {
1959+
startLatch.await();
1960+
streams.closeToError();
1961+
} catch (final Throwable t) {
1962+
// Ignore - this is expected to race
1963+
} finally {
1964+
completionLatch.countDown();
1965+
}
1966+
},
1967+
"ErrorThread"
1968+
).start();
1969+
1970+
// Start the race
1971+
startLatch.countDown();
1972+
1973+
// Wait for completion
1974+
assertTrue(
1975+
completionLatch.await(15, TimeUnit.SECONDS),
1976+
"All threads should complete within timeout"
1977+
);
1978+
1979+
if (closeException.get() != null) {
1980+
// Before fix: StreamsException("Failed to shut down while in state ERROR")
1981+
// After fix: No exception
1982+
fail("Race condition detected; close() threw exception", closeException.get());
1983+
}
1984+
}
1985+
}
1986+
19171987
private Topology getStatefulTopology(final String inputTopic,
19181988
final String outputTopic,
19191989
final String globalTopicName,

0 commit comments

Comments
 (0)