diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java b/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java index b3c2b8de5bdef..d8c67568304a6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java @@ -506,23 +506,47 @@ public void startBK() throws Exception { public void stop() throws Exception { if (null != streamStorage) { log.debug("Local bk stream storage stopping ..."); - streamStorage.close(); + try { + streamStorage.close(); + } catch (Exception e) { + log.warn().exception(e).log("failed to shutdown stream storage"); + } } log.debug("Local ZK/BK stopping ..."); - for (LifecycleComponent bookie : bookieComponents) { - try { - if (bookie != null) { - bookie.close(); + if (bookieComponents != null) { + for (LifecycleComponent bookie : bookieComponents) { + try { + if (bookie != null) { + bookie.close(); + } + } catch (Exception e) { + log.warn().exception(e).log("failed to shutdown bookie"); } - } catch (Exception e) { - log.warn().exception(e).log("failed to shutdown bookie"); } } - zkc.close(); - zks.shutdown(); - serverFactory.shutdown(); + if (zkc != null) { + try { + zkc.close(); + } catch (Exception e) { + log.warn().exception(e).log("failed to close zk client"); + } + } + if (zks != null) { + try { + zks.shutdown(); + } catch (Exception e) { + log.warn().exception(e).log("failed to shutdown zk server"); + } + } + if (serverFactory != null) { + try { + serverFactory.shutdown(); + } catch (Exception e) { + log.warn().exception(e).log("failed to shutdown zk server factory"); + } + } if (zkDataCleanupManager != null) { zkDataCleanupManager.shutdown(); @@ -530,7 +554,11 @@ public void stop() throws Exception { log.debug("Local ZK/BK stopped"); for (File managedDir : temporaryDirectories) { log.info().attr("directory", managedDir).log("deleting test directory"); - FileUtils.deleteDirectory(managedDir); + try { + FileUtils.deleteDirectory(managedDir); + } catch (Exception e) { + log.warn().attr("directory", managedDir).exception(e).log("failed to delete test directory"); + } } temporaryDirectories.clear(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java index b0e572a826c47..e58b8cdcce730 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java @@ -31,7 +31,6 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfo; -import org.apache.pulsar.common.util.PortManager; import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; import org.awaitility.Awaitility; import org.testng.Assert; @@ -42,14 +41,17 @@ public class ShadowTopicRealBkTest { private static final String cluster = "test"; - private final int zkPort = PortManager.nextLockedFreePort(); - private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, zkPort, PortManager::nextLockedFreePort); + // Pass 0 for both ZK and bookie ports so the kernel picks free ports at bind time, avoiding + // any JVM-vs-OS race on pre-allocated ports. The actual ZK port is read back via + // bk.getZookeeperPort(). + private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, 0, () -> 0); private PulsarService pulsar; private PulsarAdmin admin; @BeforeClass public void setup() throws Exception { bk.start(); + final int zkPort = bk.getZookeeperPort(); final var config = new ServiceConfiguration(); config.setClusterName(cluster); config.setAdvertisedAddress("localhost"); @@ -68,7 +70,11 @@ public void setup() throws Exception { @AfterClass(alwaysRun = true) public void cleanup() throws Exception { if (pulsar != null) { - pulsar.close(); + try { + pulsar.close(); + } catch (Exception e) { + // best effort cleanup; setup may have failed before pulsar was fully initialized + } } bk.stop(); }