From 0128c7d7337ca6b6fbd36c3142920c645f7d9614 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Tue, 5 May 2026 16:35:33 -0700 Subject: [PATCH 1/3] [fix][test] Make LocalBookkeeperEnsemble robust to port race and partial-setup cleanup `ShadowTopicRealBkTest.setup` occasionally fails with `BindException: Address already in use`: `PortManager.nextLockedFreePort()` only locks the port at the JVM level, so another process can grab the port between allocation and the ZK bind. When that happens, the subsequent `cleanup()` call also fails with NPE because `LocalBookkeeperEnsemble.stop()` dereferences `bookieComponents` (and other fields) that are still null from the aborted setup. * `LocalBookkeeperEnsemble.runZookeeper` now retries on `BindException` up to 5 attempts and falls back to a kernel-assigned port (port 0) on the last try. Callers must read the actual bound port via `getZookeeperPort()` after start. * `LocalBookkeeperEnsemble.stop` null-guards every field it touches so cleanup never NPEs when setup failed mid-way. * `ShadowTopicRealBkTest` now reads the bound ZK port back from the ensemble after start, so the metadata store URL stays correct even when the retry fallback kicks in. The cleanup path also tolerates a partially-initialized PulsarService. --- .../zookeeper/LocalBookkeeperEnsemble.java | 113 ++++++++++++++---- .../persistent/ShadowTopicRealBkTest.java | 13 +- 2 files changed, 98 insertions(+), 28 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java b/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java index b3c2b8de5bdef..5d0afdf149cfd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java @@ -33,6 +33,7 @@ import java.io.InputStreamReader; import java.io.OutputStream; import java.lang.reflect.Method; +import java.net.BindException; import java.net.InetSocketAddress; import java.net.Socket; import java.net.URI; @@ -200,24 +201,58 @@ private void runZookeeper(int maxCC) throws IOException { cleanDirectory(zkDataDir); } - try { - // Allow all commands on ZK control port - System.setProperty("zookeeper.4lw.commands.whitelist", "*"); - zks = new ZooKeeperServer(zkDataDir, zkDataDir, ZooKeeperServer.DEFAULT_TICK_TIME); + // The requested zkPort may have been grabbed by another process between port allocation + // and bind (PortManager only locks the port at the JVM level, not the OS level). Retry + // a few times to avoid spurious test failures, then fall back to a kernel-assigned port. + // Callers should read back the actual bound port via getZookeeperPort(). + int maxAttempts = 5; + BindException lastBindException = null; + for (int attempt = 0; attempt < maxAttempts; attempt++) { + int attemptPort = (attempt == maxAttempts - 1) ? 0 : zkPort; + try { + // Allow all commands on ZK control port + System.setProperty("zookeeper.4lw.commands.whitelist", "*"); + zks = new ZooKeeperServer(zkDataDir, zkDataDir, ZooKeeperServer.DEFAULT_TICK_TIME); - serverFactory = new NIOServerCnxnFactory(); - serverFactory.configure(new InetSocketAddress(zkPort), maxCC); - serverFactory.startup(zks); + serverFactory = new NIOServerCnxnFactory(); + serverFactory.configure(new InetSocketAddress(attemptPort), maxCC); + serverFactory.startup(zks); - zkDataCleanupManager = new DatadirCleanupManager(zkDataDir, zkDataDir, 3, 1 /* hour */); - zkDataCleanupManager.start(); - } catch (Exception e) { - log.error().exception(e).log("Exception while instantiating ZooKeeper"); + zkDataCleanupManager = new DatadirCleanupManager(zkDataDir, zkDataDir, 3, 1 /* hour */); + zkDataCleanupManager.start(); + lastBindException = null; + break; + } catch (BindException be) { + lastBindException = be; + log.warn().attr("port", attemptPort).attr("attempt", attempt + 1) + .exception(be).log("ZooKeeper bind failed, will retry"); + if (serverFactory != null) { + try { + serverFactory.shutdown(); + } catch (Exception ignored) { + // best effort + } + serverFactory = null; + } + if (zks != null) { + try { + zks.shutdown(); + } catch (Exception ignored) { + // best effort + } + zks = null; + } + } catch (Exception e) { + log.error().exception(e).log("Exception while instantiating ZooKeeper"); - if (serverFactory != null) { - serverFactory.shutdown(); + if (serverFactory != null) { + serverFactory.shutdown(); + } + throw new IOException(e); } - throw new IOException(e); + } + if (lastBindException != null) { + throw new IOException("Unable to bind ZooKeeper after " + maxAttempts + " attempts", lastBindException); } this.zkPort = serverFactory.getLocalPort(); @@ -506,23 +541,47 @@ public void startBK() throws Exception { public void stop() throws Exception { if (null != streamStorage) { log.debug("Local bk stream storage stopping ..."); - streamStorage.close(); + try { + streamStorage.close(); + } catch (Exception e) { + log.warn().exception(e).log("failed to shutdown stream storage"); + } } log.debug("Local ZK/BK stopping ..."); - for (LifecycleComponent bookie : bookieComponents) { - try { - if (bookie != null) { - bookie.close(); + if (bookieComponents != null) { + for (LifecycleComponent bookie : bookieComponents) { + try { + if (bookie != null) { + bookie.close(); + } + } catch (Exception e) { + log.warn().exception(e).log("failed to shutdown bookie"); } - } catch (Exception e) { - log.warn().exception(e).log("failed to shutdown bookie"); } } - zkc.close(); - zks.shutdown(); - serverFactory.shutdown(); + if (zkc != null) { + try { + zkc.close(); + } catch (Exception e) { + log.warn().exception(e).log("failed to close zk client"); + } + } + if (zks != null) { + try { + zks.shutdown(); + } catch (Exception e) { + log.warn().exception(e).log("failed to shutdown zk server"); + } + } + if (serverFactory != null) { + try { + serverFactory.shutdown(); + } catch (Exception e) { + log.warn().exception(e).log("failed to shutdown zk server factory"); + } + } if (zkDataCleanupManager != null) { zkDataCleanupManager.shutdown(); @@ -530,7 +589,11 @@ public void stop() throws Exception { log.debug("Local ZK/BK stopped"); for (File managedDir : temporaryDirectories) { log.info().attr("directory", managedDir).log("deleting test directory"); - FileUtils.deleteDirectory(managedDir); + try { + FileUtils.deleteDirectory(managedDir); + } catch (Exception e) { + log.warn().attr("directory", managedDir).exception(e).log("failed to delete test directory"); + } } temporaryDirectories.clear(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java index b0e572a826c47..f437f62bd2a31 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java @@ -42,14 +42,17 @@ public class ShadowTopicRealBkTest { private static final String cluster = "test"; - private final int zkPort = PortManager.nextLockedFreePort(); - private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, zkPort, PortManager::nextLockedFreePort); + private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, + PortManager.nextLockedFreePort(), PortManager::nextLockedFreePort); private PulsarService pulsar; private PulsarAdmin admin; @BeforeClass public void setup() throws Exception { bk.start(); + // Read the actual bound ZK port: LocalBookkeeperEnsemble may have retried with a different port + // if the original one was grabbed by another process between allocation and bind. + final int zkPort = bk.getZookeeperPort(); final var config = new ServiceConfiguration(); config.setClusterName(cluster); config.setAdvertisedAddress("localhost"); @@ -68,7 +71,11 @@ public void setup() throws Exception { @AfterClass(alwaysRun = true) public void cleanup() throws Exception { if (pulsar != null) { - pulsar.close(); + try { + pulsar.close(); + } catch (Exception e) { + // best effort cleanup; setup may have failed before pulsar was fully initialized + } } bk.stop(); } From d20e73e64408deca31c712eb18a8d692d9d0eb52 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Tue, 5 May 2026 16:43:29 -0700 Subject: [PATCH 2/3] Simplify: use port 0 instead of retrying allocated port Drop the BindException retry loop in LocalBookkeeperEnsemble.runZookeeper and instead pass 0 for the ZK port from the test. The kernel guarantees a free port at bind time, so there is no race window. The actual bound port is read back via bk.getZookeeperPort(). The cleanup null-safety fix in LocalBookkeeperEnsemble.stop() is kept because any partial-init failure (not just port bind) would otherwise NPE the cleanup. --- .../zookeeper/LocalBookkeeperEnsemble.java | 63 +++++-------------- .../persistent/ShadowTopicRealBkTest.java | 8 +-- 2 files changed, 18 insertions(+), 53 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java b/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java index 5d0afdf149cfd..d8c67568304a6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java @@ -33,7 +33,6 @@ import java.io.InputStreamReader; import java.io.OutputStream; import java.lang.reflect.Method; -import java.net.BindException; import java.net.InetSocketAddress; import java.net.Socket; import java.net.URI; @@ -201,58 +200,24 @@ private void runZookeeper(int maxCC) throws IOException { cleanDirectory(zkDataDir); } - // The requested zkPort may have been grabbed by another process between port allocation - // and bind (PortManager only locks the port at the JVM level, not the OS level). Retry - // a few times to avoid spurious test failures, then fall back to a kernel-assigned port. - // Callers should read back the actual bound port via getZookeeperPort(). - int maxAttempts = 5; - BindException lastBindException = null; - for (int attempt = 0; attempt < maxAttempts; attempt++) { - int attemptPort = (attempt == maxAttempts - 1) ? 0 : zkPort; - try { - // Allow all commands on ZK control port - System.setProperty("zookeeper.4lw.commands.whitelist", "*"); - zks = new ZooKeeperServer(zkDataDir, zkDataDir, ZooKeeperServer.DEFAULT_TICK_TIME); + try { + // Allow all commands on ZK control port + System.setProperty("zookeeper.4lw.commands.whitelist", "*"); + zks = new ZooKeeperServer(zkDataDir, zkDataDir, ZooKeeperServer.DEFAULT_TICK_TIME); - serverFactory = new NIOServerCnxnFactory(); - serverFactory.configure(new InetSocketAddress(attemptPort), maxCC); - serverFactory.startup(zks); + serverFactory = new NIOServerCnxnFactory(); + serverFactory.configure(new InetSocketAddress(zkPort), maxCC); + serverFactory.startup(zks); - zkDataCleanupManager = new DatadirCleanupManager(zkDataDir, zkDataDir, 3, 1 /* hour */); - zkDataCleanupManager.start(); - lastBindException = null; - break; - } catch (BindException be) { - lastBindException = be; - log.warn().attr("port", attemptPort).attr("attempt", attempt + 1) - .exception(be).log("ZooKeeper bind failed, will retry"); - if (serverFactory != null) { - try { - serverFactory.shutdown(); - } catch (Exception ignored) { - // best effort - } - serverFactory = null; - } - if (zks != null) { - try { - zks.shutdown(); - } catch (Exception ignored) { - // best effort - } - zks = null; - } - } catch (Exception e) { - log.error().exception(e).log("Exception while instantiating ZooKeeper"); + zkDataCleanupManager = new DatadirCleanupManager(zkDataDir, zkDataDir, 3, 1 /* hour */); + zkDataCleanupManager.start(); + } catch (Exception e) { + log.error().exception(e).log("Exception while instantiating ZooKeeper"); - if (serverFactory != null) { - serverFactory.shutdown(); - } - throw new IOException(e); + if (serverFactory != null) { + serverFactory.shutdown(); } - } - if (lastBindException != null) { - throw new IOException("Unable to bind ZooKeeper after " + maxAttempts + " attempts", lastBindException); + throw new IOException(e); } this.zkPort = serverFactory.getLocalPort(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java index f437f62bd2a31..845beebc48298 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java @@ -42,16 +42,16 @@ public class ShadowTopicRealBkTest { private static final String cluster = "test"; - private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, - PortManager.nextLockedFreePort(), PortManager::nextLockedFreePort); + // Pass 0 for the ZK port so the kernel picks a free port at bind time. This avoids the race + // where PortManager.nextLockedFreePort() reserves a port at the JVM level but another process + // grabs it before the bind. The actual bound port is read back via bk.getZookeeperPort(). + private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, 0, PortManager::nextLockedFreePort); private PulsarService pulsar; private PulsarAdmin admin; @BeforeClass public void setup() throws Exception { bk.start(); - // Read the actual bound ZK port: LocalBookkeeperEnsemble may have retried with a different port - // if the original one was grabbed by another process between allocation and bind. final int zkPort = bk.getZookeeperPort(); final var config = new ServiceConfiguration(); config.setClusterName(cluster); From b69658384f96f6a6268a545f4f07af35fdc4c7bf Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Tue, 5 May 2026 17:05:45 -0700 Subject: [PATCH 3/3] Use () -> 0 for bookie port supplier instead of PortManager --- .../broker/service/persistent/ShadowTopicRealBkTest.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java index 845beebc48298..e58b8cdcce730 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java @@ -31,7 +31,6 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfo; -import org.apache.pulsar.common.util.PortManager; import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; import org.awaitility.Awaitility; import org.testng.Assert; @@ -42,10 +41,10 @@ public class ShadowTopicRealBkTest { private static final String cluster = "test"; - // Pass 0 for the ZK port so the kernel picks a free port at bind time. This avoids the race - // where PortManager.nextLockedFreePort() reserves a port at the JVM level but another process - // grabs it before the bind. The actual bound port is read back via bk.getZookeeperPort(). - private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, 0, PortManager::nextLockedFreePort); + // Pass 0 for both ZK and bookie ports so the kernel picks free ports at bind time, avoiding + // any JVM-vs-OS race on pre-allocated ports. The actual ZK port is read back via + // bk.getZookeeperPort(). + private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, 0, () -> 0); private PulsarService pulsar; private PulsarAdmin admin;