diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java index 236f596c83c86..23ad9af3ad04d 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java @@ -18,7 +18,6 @@ */ package org.apache.bookkeeper.mledger.impl; -import static org.apache.pulsar.common.util.PortManager.releaseLockedPort; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotNull; @@ -134,7 +133,7 @@ public void testBookieFailure() throws Exception { metadataStore.unsetAlwaysFail(); bkc = new PulsarBookKeeperTestClient(baseClientConf); - int port = startNewBookie(); + startNewBookie(); // Reconnect a new bk client factory.shutdown(); @@ -164,7 +163,6 @@ public void testBookieFailure() throws Exception { assertEquals("entry-2", new String(entries.get(0).getData())); entries.forEach(Entry::release); factory.shutdown(); - releaseLockedPort(port); } @Test diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java index 78891cdd7a473..7e5486eac293d 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java @@ -25,7 +25,6 @@ package org.apache.bookkeeper.test; import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE; -import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort; import static org.testng.Assert.assertFalse; import com.google.common.base.Stopwatch; import java.io.File; @@ -296,14 +295,10 @@ protected void stopBKCluster() throws Exception { protected ServerConfiguration newServerConfiguration() throws Exception { File f = tmpDirs.createNew("bookie", "test"); - - int port; - if (baseConf.isEnableLocalTransport() || !baseConf.getAllowEphemeralPorts()) { - port = nextLockedFreePort(); - } else { - port = 0; - } - return newServerConfiguration(port, f, new File[] { f }); + // Bookies need a pre-allocated port: BK identifies them by host:port in metadata + // and the test client resolves that back to a TCP address. Port 0 would leave + // the cookie + registration with port=0, which fails DNS-style resolution. + return newServerConfiguration(PortManager.nextLockedFreePort(), f, new File[] { f }); } protected ClientConfiguration newClientConfiguration() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java index 0f4166be92213..b673126307934 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java @@ -84,10 +84,6 @@ public void setBkEnsemble(LocalBookkeeperEnsemble bkEnsemble) { this.bkEnsemble = bkEnsemble; } - public void setBkPort(int bkPort) { - this.bkPort = bkPort; - } - public void setBkDir(String bkDir) { this.bkDir = bkDir; } @@ -172,10 +168,6 @@ public int getZkPort() { return zkPort; } - public int getBkPort() { - return bkPort; - } - public String getZkDir() { return zkDir; } @@ -237,9 +229,6 @@ public boolean isHelp() { hidden = true) private int zkPort = 2181; - @Option(names = { "--bookkeeper-port" }, description = "Local bookies base port") - private int bkPort = 3181; - @Option(names = { "--zookeeper-dir" }, description = "Local zooKeeper's data directory", hidden = true) @@ -470,7 +459,6 @@ void startBookieWithMetadataStore() throws Exception { bkCluster = BKCluster.builder() .baseServerConfiguration(bkServerConf) .metadataServiceUri(metadataStoreUrl) - .bkPort(bkPort) .numBookies(numOfBk) .dataDir(bkDir) .clearOldData(wipeData) @@ -484,9 +472,9 @@ private void startBookieWithZookeeper() throws Exception { ServerConfiguration bkServerConf = new ServerConfiguration(); bkServerConf.loadConf(new File(configFile).toURI().toURL()); calculateCacheSize(bkServerConf); - // Start LocalBookKeeper + // Start LocalBookKeeper. Bookies bind to kernel-assigned ports. bkEnsemble = new LocalBookkeeperEnsemble( - this.getNumOfBk(), this.getZkPort(), this.getBkPort(), this.getStreamStoragePort(), this.getZkDir(), + this.getNumOfBk(), this.getZkPort(), this.getStreamStoragePort(), this.getZkDir(), this.getBkDir(), this.isWipeData(), "127.0.0.1"); bkEnsemble.startStandalone(bkServerConf, !this.isNoStreamStorage()); config.setMetadataStoreUrl("zk:127.0.0.1:" + zkPort); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneBuilder.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneBuilder.java index 5469d24782958..2cf91c564d5cf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneBuilder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneBuilder.java @@ -56,11 +56,6 @@ public PulsarStandaloneBuilder withZkPort(int zkPort) { return this; } - public PulsarStandaloneBuilder withBkPort(int bkPort) { - pulsarStandalone.setBkPort(bkPort); - return this; - } - public PulsarStandaloneBuilder withZkDir(String zkDir) { pulsarStandalone.setZkDir(zkDir); return this; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java b/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java index d8c67568304a6..2f68928d6d6c7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java @@ -42,7 +42,6 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; import java.util.stream.Stream; import java.util.stream.StreamSupport; import lombok.CustomLog; @@ -91,46 +90,18 @@ public class LocalBookkeeperEnsemble { int numberOfBookies; private final boolean clearOldData; - private static class BasePortManager implements Supplier { - - private int port; - - public BasePortManager(int basePort) { - this.port = basePort; - } - - @Override - public synchronized Integer get() { - return port++; - } + public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort) { + this(numberOfBookies, zkPort, 4181, null, null, true, null); } - private final Supplier portManager; - - public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, Supplier portManager) { - this(numberOfBookies, zkPort, 4181, null, null, true, null, portManager); - } - - public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, int bkBasePort, String zkDataDirName, + public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, String zkDataDirName, String bkDataDirName, boolean clearOldData) { - this(numberOfBookies, zkPort, bkBasePort, 4181, zkDataDirName, bkDataDirName, clearOldData, null); + this(numberOfBookies, zkPort, 4181, zkDataDirName, bkDataDirName, clearOldData, null); } - public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, int bkBasePort, String zkDataDirName, + public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, String zkDataDirName, String bkDataDirName, boolean clearOldData, String advertisedAddress) { - this(numberOfBookies, zkPort, bkBasePort, 4181, zkDataDirName, bkDataDirName, clearOldData, advertisedAddress); - } - - public LocalBookkeeperEnsemble(int numberOfBookies, - int zkPort, - int bkBasePort, - int streamStoragePort, - String zkDataDirName, - String bkDataDirName, - boolean clearOldData, - String advertisedAddress) { - this(numberOfBookies, zkPort, streamStoragePort, zkDataDirName, bkDataDirName, clearOldData, advertisedAddress, - bkBasePort != 0 ? new BasePortManager(bkBasePort) : () -> 0); + this(numberOfBookies, zkPort, 4181, zkDataDirName, bkDataDirName, clearOldData, advertisedAddress); } public LocalBookkeeperEnsemble(int numberOfBookies, @@ -139,10 +110,8 @@ public LocalBookkeeperEnsemble(int numberOfBookies, String zkDataDirName, String bkDataDirName, boolean clearOldData, - String advertisedAddress, - Supplier portManager) { + String advertisedAddress) { this.numberOfBookies = numberOfBookies; - this.portManager = portManager; this.streamStoragePort = streamStoragePort; this.zkDataDirName = zkDataDirName; this.bkDataDirName = bkDataDirName; @@ -301,7 +270,8 @@ private void runBookies(ServerConfiguration baseConf) throws Exception { cleanDirectory(bkDataDir); } - int bookiePort = portManager.get(); + // Bookies bind to a kernel-assigned port; identity is established via bookieId. + int bookiePort = 0; String bookieId = "bk" + i + "test"; // Ensure registration Z-nodes are cleared when standalone service is restarted ungracefully deleteBookieRegistrationZnode( diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/PulsarStandaloneTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/PulsarStandaloneTest.java index 77c1aced14df2..21b4fe43de268 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/PulsarStandaloneTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/PulsarStandaloneTest.java @@ -54,7 +54,6 @@ public void testStandaloneWithRocksDB() throws Exception { PulsarStandaloneStarter standalone = new PulsarStandaloneStarter(args); standalone.setBkDir(tempDir.getAbsolutePath()); - standalone.setBkPort(0); standalone.setNumOfBk(bookieNum); standalone.startBookieWithMetadataStore(); @@ -67,10 +66,12 @@ public void testStandaloneWithRocksDB() throws Exception { List secondBsConfs = standalone.bkCluster.getBsConfs(); Assert.assertEquals(secondBsConfs.size(), bookieNum); + // Cookies must be preserved across restart (otherwise bookie startup would have failed + // with InvalidCookieException). The bookieId is the persistent identity. for (int i = 0; i < bookieNum; i++) { ServerConfiguration conf1 = firstBsConfs.get(i); ServerConfiguration conf2 = secondBsConfs.get(i); - Assert.assertEquals(conf1.getBookiePort(), conf2.getBookiePort()); + Assert.assertEquals(conf1.getBookieId(), conf2.getBookieId()); } standalone.close(); cleanDirectory(tempDir); @@ -93,7 +94,6 @@ public void testMetadataInitialization(boolean enableBrokerClientAuth) throws Ex } final File bkDir = IOUtils.createTempDir("standalone", "bk"); standalone.setNumOfBk(1); - standalone.setBkPort(0); standalone.setBkDir(bkDir.getAbsolutePath()); standalone.start(); @@ -148,7 +148,6 @@ public void testShutdownHookClosesBkCluster() throws Exception { bkDir.getAbsolutePath() }); standalone.setTestMode(true); - standalone.setBkPort(0); standalone.start(); BKCluster bkCluster = standalone.bkCluster; standalone.runShutdownHook(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java index 076d649d5d983..203b8668288a1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java @@ -22,7 +22,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Optional; import lombok.CustomLog; import org.apache.bookkeeper.client.PulsarMockBookKeeper; import org.apache.bookkeeper.common.util.OrderedExecutor; @@ -35,7 +34,6 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown; -import org.apache.pulsar.common.util.PortManager; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -46,6 +44,7 @@ public abstract class MultiBrokerBaseTest extends MockedPulsarServiceBaseTest { protected List additionalBrokerAdmins; protected List additionalBrokerClients; protected PulsarMockBookKeeper mockBookKeeper; + // Populated after broker startup with kernel-assigned ports. protected int mainBrokerPort; protected List additionalBrokerPorts = new ArrayList<>(); @@ -53,17 +52,10 @@ protected int numberOfAdditionalBrokers() { return 2; } - protected boolean useDynamicBrokerPorts() { - return true; - } - @BeforeClass(alwaysRun = true) @Override public final void setup() throws Exception { beforeSetup(); - if (!useDynamicBrokerPorts()) { - mainBrokerPort = PortManager.nextLockedFreePort(); - } OrderedExecutor mockBookKeeperExecutor = OrderedExecutor.newBuilder().numThreads(1) .name(MultiBrokerBaseTest.class.getSimpleName() + "-bk-executor").build(); registerCloseable(() -> GracefulExecutorServicesShutdown.initiate() @@ -75,6 +67,7 @@ public final void setup() throws Exception { ((NonClosableMockBookKeeper) mockBookKeeper).reallyShutdown(); }); super.internalSetup(); + mainBrokerPort = pulsar.getBrokerListenPort().orElse(0); additionalBrokersSetup(); pulsarResourcesSetup(); additionalSetup(); @@ -89,14 +82,6 @@ protected void customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder p pulsarTestContextBuilder.bookKeeperClient(mockBookKeeper); } - @Override - protected void doInitConf() throws Exception { - super.doInitConf(); - if (!useDynamicBrokerPorts()) { - conf.setBrokerServicePort(Optional.of(mainBrokerPort)); - } - } - protected void additionalSetup() throws Exception { // override this method to add any additional setup logic @@ -115,17 +100,12 @@ protected void additionalBrokersSetup() throws Exception { additionalBrokerClients = new ArrayList<>(numberOfAdditionalBrokers); additionalPulsarTestContexts = new ArrayList<>(numberOfAdditionalBrokers); additionalBrokerPorts = new ArrayList<>(numberOfAdditionalBrokers); - if (!useDynamicBrokerPorts()) { - for (int i = 0; i < numberOfAdditionalBrokers; i++) { - int port = PortManager.nextLockedFreePort(); - additionalBrokerPorts.add(port); - } - } for (int i = 0; i < numberOfAdditionalBrokers; i++) { PulsarTestContext pulsarTestContext = createAdditionalBroker(i); additionalPulsarTestContexts.add(i, pulsarTestContext); PulsarService pulsarService = pulsarTestContext.getPulsarService(); additionalBrokers.add(i, pulsarService); + additionalBrokerPorts.add(pulsarService.getBrokerListenPort().orElse(0)); PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder().serviceHttpUrl(pulsarService.getWebServiceAddress() != null ? pulsarService.getWebServiceAddress() @@ -159,9 +139,6 @@ protected ServiceConfiguration createConfForAdditionalBroker(int additionalBroke protected PulsarTestContext createAdditionalBroker(int additionalBrokerIndex) throws Exception { ServiceConfiguration conf = createConfForAdditionalBroker(additionalBrokerIndex); - if (!useDynamicBrokerPorts()) { - conf.setBrokerServicePort(Optional.of(additionalBrokerPorts.get(additionalBrokerIndex))); - } return createAdditionalPulsarTestContext(conf); } @@ -175,14 +152,6 @@ public final void cleanup() throws Exception { log.warn().exception(e).log("Exception during additional cleanup"); } super.internalCleanup(); - if (!useDynamicBrokerPorts()) { - if (mainBrokerPort > 0) { - PortManager.releaseLockedPort(mainBrokerPort); - } - for (Integer port : additionalBrokerPorts) { - PortManager.releaseLockedPort(port); - } - } } protected void additionalCleanup() throws Exception { @@ -212,9 +181,6 @@ protected void additionalBrokersCleanup() { try { pulsarService.getConfiguration().setBrokerShutdownTimeoutMs(0L); pulsarTestContext.close(); - pulsarService.getConfiguration().getBrokerServicePort().ifPresent(PortManager::releaseLockedPort); - pulsarService.getConfiguration().getWebServicePort().ifPresent(PortManager::releaseLockedPort); - pulsarService.getConfiguration().getWebServicePortTls().ifPresent(PortManager::releaseLockedPort); } catch (Exception e) { log.warn().exception(e).log("Failed to stop additional broker"); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java index 523006e9d74a6..3426753fbc66e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java @@ -71,7 +71,7 @@ void setup() throws Exception { new LinkedBlockingQueue<>()); log.info("---- Initializing SLAMonitoringTest -----"); // Start local bookkeeper ensemble - bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(3, 0); bkEnsemble.start(); // start brokers diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java index 49e4c6b21035e..b4dec43054774 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java @@ -78,7 +78,7 @@ public class GetPartitionMetadataTest extends TestRetrySupport { @BeforeClass(alwaysRun = true) protected void setup() throws Exception { incrementSetupNumber(); - bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(3, 0); bkEnsemble.start(); // Start broker. setupBrokers(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/AbstractBrokerEntryCacheMultiBrokerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/AbstractBrokerEntryCacheMultiBrokerTest.java index 48cc5c6799394..3e2b0a4655c4b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/AbstractBrokerEntryCacheMultiBrokerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/AbstractBrokerEntryCacheMultiBrokerTest.java @@ -197,11 +197,6 @@ protected int numberOfAdditionalBrokers() { return 1; } - @Override - protected boolean useDynamicBrokerPorts() { - return false; - } - @BeforeMethod(alwaysRun = true) public final void doBeforeMethod() { beforeMethod(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java index d019af35a6928..64c0ee3f47ab9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java @@ -65,6 +65,8 @@ protected ServiceConfiguration createConfForAdditionalBroker(int additionalBroke } private void updateConfig(ServiceConfiguration conf, String advertisedAddress) { + // Pre-allocate ports because the advertised listener URLs are baked into config + // before the broker starts. The broker then binds to the same ports. int pulsarPort = nextLockedFreePort(); int httpPort = nextLockedFreePort(); int httpsPort = nextLockedFreePort(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java index 62571cc3ff40b..0bdeb4341d99e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java @@ -52,7 +52,7 @@ public class LeaderElectionServiceTest { @BeforeMethod(alwaysRun = true) public void setup() throws Exception { - bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(3, 0); bkEnsemble.start(); log.info("---- bk started ----"); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java index c6617bbacaaee..cf8db26a4a6b5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java @@ -107,7 +107,7 @@ public class LoadBalancerTest { @BeforeMethod void setup() throws Exception { // Start local bookkeeper ensemble - bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(3, 0); bkEnsemble.start(); ZkUtils.createFullPathOptimistic(bkEnsemble.getZkClient(), SimpleLoadManagerImpl.LOADBALANCER_DYNAMIC_SETTING_STRATEGY_ZPATH, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java index 879afd00c1b78..36bd4b1e0fc16 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java @@ -48,7 +48,7 @@ public void testHasNICSpeed() throws Exception { } // Start local bookkeeper ensemble @Cleanup("stop") - LocalBookkeeperEnsemble bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + LocalBookkeeperEnsemble bkEnsemble = new LocalBookkeeperEnsemble(3, 0); bkEnsemble.start(); // Start broker ServiceConfiguration config = new ServiceConfiguration(); @@ -79,7 +79,7 @@ public void testNoNICSpeed() throws Exception { } // Start local bookkeeper ensemble @Cleanup("stop") - LocalBookkeeperEnsemble bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + LocalBookkeeperEnsemble bkEnsemble = new LocalBookkeeperEnsemble(3, 0); bkEnsemble.start(); // Start broker ServiceConfiguration config = new ServiceConfiguration(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java index 6103b948236e6..20a5883d65df7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java @@ -124,7 +124,7 @@ public class SimpleLoadManagerImplTest { void setup() throws Exception { // Start local bookkeeper ensemble - bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(3, 0); bkEnsemble.start(); // Start broker 1 diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java index 80e54278d31e4..138ad1dee1109 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java @@ -47,7 +47,7 @@ public class BrokerRegistryIntegrationTest { @BeforeClass protected void setup() throws Exception { - bk = new LocalBookkeeperEnsemble(2, 0, () -> 0); + bk = new LocalBookkeeperEnsemble(2, 0); bk.start(); pulsar = new PulsarService(brokerConfig()); pulsar.start(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java index 0247a68256e69..73b50b4603a9a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java @@ -169,7 +169,7 @@ void setup() throws Exception { executor = new ThreadPoolExecutor(5, 20, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); // Start local bookkeeper ensemble - bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(3, 0); bkEnsemble.start(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java index 50265c28edc7d..f0734fbe8bbe6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java @@ -47,7 +47,7 @@ public class ExtensibleLoadManagerCloseTest { @BeforeClass(alwaysRun = true) public void setup() throws Exception { - bk = new LocalBookkeeperEnsemble(1, 0, () -> 0); + bk = new LocalBookkeeperEnsemble(1, 0); bk.start(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithAdvertisedListenersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithAdvertisedListenersTest.java index f393e585896a2..656088c2ba5b3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithAdvertisedListenersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithAdvertisedListenersTest.java @@ -18,12 +18,12 @@ */ package org.apache.pulsar.broker.loadbalance.extensions; -import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort; import java.util.Optional; import lombok.CustomLog; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.naming.TopicDomain; +import org.apache.pulsar.common.util.PortManager; import org.testng.annotations.DataProvider; import org.testng.annotations.Factory; import org.testng.annotations.Test; @@ -46,8 +46,10 @@ public ExtensibleLoadManagerImplWithAdvertisedListenersTest(String serviceUnitSt @Override protected ServiceConfiguration updateConfig(ServiceConfiguration conf) { super.updateConfig(conf); - int privatePulsarPort = nextLockedFreePort(); - int publicPulsarPort = nextLockedFreePort(); + // Pre-allocate ports because advertised listener URLs are baked into config + // before the broker starts. + int privatePulsarPort = PortManager.nextLockedFreePort(); + int publicPulsarPort = PortManager.nextLockedFreePort(); conf.setInternalListenerName("internal"); conf.setBindAddresses("external:pulsar://localhost:" + publicPulsarPort); conf.setAdvertisedListeners( diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerFailFastTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerFailFastTest.java index a400bf733e557..1dc254492b74f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerFailFastTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerFailFastTest.java @@ -26,7 +26,6 @@ import org.apache.pulsar.broker.loadbalance.LoadManager; import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel; import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl; -import org.apache.pulsar.common.util.PortManager; import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; import org.awaitility.Awaitility; import org.mockito.Mockito; @@ -38,8 +37,7 @@ public class LoadManagerFailFastTest { private static final String cluster = "test"; - private final int zkPort = PortManager.nextLockedFreePort(); - private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, zkPort, PortManager::nextLockedFreePort); + private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, 0); private final ServiceConfiguration config = new ServiceConfiguration(); @BeforeClass @@ -49,7 +47,7 @@ protected void setup() throws Exception { config.setAdvertisedAddress("localhost"); config.setBrokerServicePort(Optional.of(0)); config.setWebServicePort(Optional.of(0)); - config.setMetadataStoreUrl("zk:localhost:" + zkPort); + config.setMetadataStoreUrl("zk:localhost:" + bk.getZookeeperPort()); } @AfterClass diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java index 7a012b78756d4..085bea1170e8c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java @@ -59,7 +59,7 @@ public class BundleSplitterTaskTest { @BeforeMethod void setup() throws Exception { // Start local bookkeeper ensemble - bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(3, 0); bkEnsemble.start(); // Start broker ServiceConfiguration config = new ServiceConfiguration(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java index c3b5a02e5ac01..ecabd901116f1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java @@ -173,7 +173,7 @@ void setup() throws Exception { executor = new ThreadPoolExecutor(1, 20, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); // Start local bookkeeper ensemble - bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(3, 0); bkEnsemble.start(); // Start broker 1 @@ -947,25 +947,32 @@ public void testOwnBrokerZnodeByMultipleBroker() throws Exception { ServiceConfiguration config = new ServiceConfiguration(); config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName()); config.setClusterName("use"); - config.setWebServicePort(Optional.of(PortManager.nextLockedFreePort())); - config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort()); - config.setBrokerShutdownTimeoutMs(0L); - config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); - config.setBrokerServicePort(Optional.of(0)); - PulsarService pulsar = new PulsarService(config); - // create znode using different zk-session - final String brokerZnode = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + pulsar.getAdvertisedAddress() + ":" - + config.getWebServicePort().get(); - pulsar1.getLocalMetadataStore() - .put(brokerZnode, new byte[0], Optional.empty(), EnumSet.of(CreateOption.Ephemeral)).join(); + // Pre-allocate a port: the test creates a znode at the broker's would-be address before + // starting the broker, so it needs to know the address up front. + int webPort = PortManager.nextLockedFreePort(); try { - pulsar.start(); - fail("should have failed"); - } catch (PulsarServerException e) { - //Ok. - } + config.setWebServicePort(Optional.of(webPort)); + config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort()); + config.setBrokerShutdownTimeoutMs(0L); + config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); + config.setBrokerServicePort(Optional.of(0)); + PulsarService pulsar = new PulsarService(config); + // create znode using different zk-session + final String brokerZnode = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + pulsar.getAdvertisedAddress() + ":" + + config.getWebServicePort().get(); + pulsar1.getLocalMetadataStore() + .put(brokerZnode, new byte[0], Optional.empty(), EnumSet.of(CreateOption.Ephemeral)).join(); + try { + pulsar.start(); + fail("should have failed"); + } catch (PulsarServerException e) { + //Ok. + } - pulsar.close(); + pulsar.close(); + } finally { + PortManager.releaseLockedPort(webPort); + } } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandlerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandlerTest.java index 411c57c3f12e9..0c6797c93fafc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandlerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandlerTest.java @@ -21,7 +21,6 @@ import java.io.File; import java.util.Optional; import lombok.CustomLog; -import org.apache.bookkeeper.util.PortManager; import org.apache.commons.io.FileUtils; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; @@ -38,8 +37,7 @@ public class PulsarClientBasedHandlerTest { private static final String clusterName = "cluster"; private static final int shutdownTimeoutMs = 100; - private final int zkPort = PortManager.nextFreePort(); - private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, zkPort, PortManager::nextFreePort); + private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, 0); private File tempDirectory; private PulsarService pulsar; @@ -51,7 +49,7 @@ public void setup() throws Exception { config.setAdvertisedAddress("localhost"); config.setBrokerServicePort(Optional.of(0)); config.setWebServicePort(Optional.of(0)); - config.setMetadataStoreUrl("zk:127.0.0.1:" + zkPort); + config.setMetadataStoreUrl("zk:127.0.0.1:" + bk.getZookeeperPort()); tempDirectory = SimpleProtocolHandlerTestsBase.configureProtocolHandler(config, PulsarClientBasedHandler.class.getName(), true); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java index 9e32f4be8be41..f1cb937868a4e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.protocol; import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort; +import static org.apache.pulsar.common.util.PortManager.releaseLockedPort; import static org.testng.Assert.assertEquals; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; @@ -46,7 +47,6 @@ import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerTestBase; -import org.apache.pulsar.common.util.PortManager; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -88,6 +88,8 @@ public void start(BrokerService service) { @Override public Map> newChannelInitializers() { + // Pre-allocate a free port: protocol handlers need to register a listener at a known + // address before the broker calls back into them. int port = nextLockedFreePort(); this.ports.add(port); return Collections.singletonMap(new InetSocketAddress(conf.getBindAddress(), port), @@ -115,7 +117,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { @Override public void close() { - ports.removeIf(PortManager::releaseLockedPort); + ports.removeIf(p -> releaseLockedPort(p)); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java index 8dafe7a6af32d..d76c7fb19d2ab 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java @@ -41,7 +41,7 @@ public class AdvertisedAddressTest { @BeforeMethod public void setup() throws Exception { - bkEnsemble = new LocalBookkeeperEnsemble(1, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(1, 0); bkEnsemble.start(); ServiceConfiguration config = new ServiceConfiguration(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java index 9a85b65b1e74b..cbc066bd99afa 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java @@ -138,7 +138,7 @@ public Object[][] backlogQuotaSizeGB() { void setup() throws Exception { try { // start local bookie and zookeeper - bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(3, 0); bkEnsemble.start(); // start pulsar service diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java index 4a1fcbb04db30..2b01bbbb9d1b9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java @@ -86,7 +86,7 @@ protected void setup() throws Exception { incrementSetupNumber(); try { // start local bookie and zookeeper - bkEnsemble = new LocalBookkeeperEnsemble(numberOfBookies, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(numberOfBookies, 0); bkEnsemble.start(); // start pulsar service diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java index 2403675253fa4..24be9695184d8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java @@ -97,7 +97,7 @@ public class BrokerBookieIsolationTest { @BeforeMethod protected void setup() throws Exception { // Start local bookkeeper ensemble - bkEnsemble = new LocalBookkeeperEnsemble(4, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(4, 0); bkEnsemble.start(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEventLoopShutdownTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEventLoopShutdownTest.java index 29421f155b6db..b266710219822 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEventLoopShutdownTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEventLoopShutdownTest.java @@ -37,7 +37,7 @@ public class BrokerEventLoopShutdownTest { @BeforeClass(alwaysRun = true) public void setup() throws Exception { - bk = new LocalBookkeeperEnsemble(2, 0, () -> 0); + bk = new LocalBookkeeperEnsemble(2, 0); bk.start(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java index c0bd01c64a2bb..f9f0030c45fe6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java @@ -80,7 +80,7 @@ protected void startZKAndBK() throws Exception { brokerConfigZk.start(); // Start BK. - bkEnsemble = new LocalBookkeeperEnsemble(numberOfBookies, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(numberOfBookies, 0); bkEnsemble.start(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GeoReplicationWithConfigurationSyncTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GeoReplicationWithConfigurationSyncTestBase.java index 3b4bac53fec42..728af78999204 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GeoReplicationWithConfigurationSyncTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GeoReplicationWithConfigurationSyncTestBase.java @@ -77,9 +77,9 @@ protected void startZKAndBK() throws Exception { brokerConfigZk2.start(); // Start BK. - bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0); bkEnsemble1.start(); - bkEnsemble2 = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble2 = new LocalBookkeeperEnsemble(3, 0); bkEnsemble2.start(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java index 61e454622907b..b4a7e4ef0ffd5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java @@ -54,7 +54,7 @@ public class MaxMessageSizeTest { @BeforeMethod void setup() { try { - bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(3, 0); ServerConfiguration conf = new ServerConfiguration(); conf.setNettyMaxFrameSizeBytes(10 * 1024 * 1024 + 10 * 1024); bkEnsemble.startStandalone(conf, false); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java index 0d5ae2a4c0d26..de30a4c9d82c7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java @@ -86,7 +86,7 @@ public abstract class NetworkErrorTestBase extends TestRetrySupport { protected void startZKAndBK() throws Exception { // Start ZK & BK. - bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0); bkEnsemble1.start(); metadataZKProxy = new Ipv4Proxy(getOneFreePort(), "127.0.0.1", bkEnsemble1.getZookeeperPort()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java index a73a2844f79dd..0aec5358be6f8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java @@ -114,9 +114,9 @@ protected void startZKAndBK() throws Exception { } // Start BK. - bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0); bkEnsemble1.start(); - bkEnsemble2 = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble2 = new LocalBookkeeperEnsemble(3, 0); bkEnsemble2.start(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java index 265d685223e75..755230a8537b3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java @@ -155,7 +155,7 @@ protected void setup() throws Exception { globalZkS.start(); // Start region 1 - bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0); bkEnsemble1.start(); // NOTE: we have to instantiate a new copy of System.getProperties() to make sure pulsar1 and pulsar2 have @@ -174,7 +174,7 @@ protected void setup() throws Exception { // Start region 2 // Start zk & bks - bkEnsemble2 = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble2 = new LocalBookkeeperEnsemble(3, 0); bkEnsemble2.start(); setConfig2DefaultValue(); @@ -190,7 +190,7 @@ protected void setup() throws Exception { // Start region 3 // Start zk & bks - bkEnsemble3 = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble3 = new LocalBookkeeperEnsemble(3, 0); bkEnsemble3.start(); setConfig3DefaultValue(); @@ -206,7 +206,7 @@ protected void setup() throws Exception { // Start region 4 // Start zk & bks - bkEnsemble4 = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble4 = new LocalBookkeeperEnsemble(3, 0); bkEnsemble4.start(); setConfig4DefaultValue(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java index 809f9a5a6e0ed..cccc3ceeb5260 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java @@ -71,7 +71,7 @@ public class TopicOwnerTest { void setup() throws Exception { log.info("---- Initializing TopicOwnerTest -----"); // Start local bookkeeper ensemble - bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(3, 0); bkEnsemble.start(); // start brokers diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java index e58b8cdcce730..9b52582f6f39d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java @@ -41,10 +41,9 @@ public class ShadowTopicRealBkTest { private static final String cluster = "test"; - // Pass 0 for both ZK and bookie ports so the kernel picks free ports at bind time, avoiding - // any JVM-vs-OS race on pre-allocated ports. The actual ZK port is read back via - // bk.getZookeeperPort(). - private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, 0, () -> 0); + // ZK port 0 lets the kernel pick a free port at bind time. The actual port is read back + // via bk.getZookeeperPort() after start. Bookies always bind to kernel-assigned ports. + private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, 0); private PulsarService pulsar; private PulsarAdmin admin; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java index ff344ebadea92..c7136fcb84ab4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java @@ -723,6 +723,11 @@ private TestZKServer createTestZookeeper(int sessionTimeout) throws Exception { protected void handlePreallocatePorts(ServiceConfiguration config) { if (super.preallocatePorts) { + // Pre-allocate ports for callers that need the port number BEFORE the broker + // starts (e.g. to build advertised-listener URLs). PortManager hands out ports + // outside the ephemeral range, so the kernel won't auto-assign them to other + // processes. Tests that don't need a pre-known port should leave + // preallocatePorts=false and let the broker bind to 0 directly. config.getBrokerServicePort().ifPresent(portNumber -> { if (portNumber == 0) { config.setBrokerServicePort(Optional.of(PortManager.nextLockedFreePort())); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java index 81321427a2b4b..904a4807ad391 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java @@ -50,7 +50,7 @@ public abstract class TransactionMetaStoreTestBase extends TestRetrySupport { protected final void setup() throws Exception { log.info().attr("class", getClass().getSimpleName()).log("---- Initializing -----"); // Start local bookkeeper ensemble - bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(3, 0); bkEnsemble.start(); String[] args = new String[]{ diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java index 2602233293e78..6f173aa2ba04e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java @@ -76,7 +76,7 @@ void setup(Method method) throws Exception { log.info().attr("upMethod", method.getName()).log("--- Setting up method"); // Start local bookkeeper ensemble - bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(3, 0); bkEnsemble.start(); config = new ServiceConfiguration(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/HybridTypesAcknowledgeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/HybridTypesAcknowledgeTest.java index 3fa6750c7de33..451a15ebe50ed 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/HybridTypesAcknowledgeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/HybridTypesAcknowledgeTest.java @@ -73,7 +73,7 @@ public class HybridTypesAcknowledgeTest extends TestRetrySupport { @BeforeClass(alwaysRun = true) protected void setup() throws Exception { incrementSetupNumber(); - bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(3, 0); bkEnsemble.start(); // Start broker. setupBrokers(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java index 97126457cd1ee..1552940cda407 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java @@ -989,7 +989,7 @@ void setupReplicationCluster() throws Exception { globalZkS.start(); // Start region 1 - bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0); bkEnsemble1.start(); // NOTE: we have to instantiate a new copy of System.getProperties() to make sure pulsar1 and pulsar2 have @@ -1019,7 +1019,7 @@ void setupReplicationCluster() throws Exception { // Start region 2 // Start zk & bks - bkEnsemble2 = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble2 = new LocalBookkeeperEnsemble(3, 0); bkEnsemble2.start(); config2 = new ServiceConfiguration(); @@ -1046,7 +1046,7 @@ void setupReplicationCluster() throws Exception { // Start region 3 // Start zk & bks - bkEnsemble3 = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble3 = new LocalBookkeeperEnsemble(3, 0); bkEnsemble3.start(); config3 = new ServiceConfiguration(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ServiceUrlQuarantineTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ServiceUrlQuarantineTest.java index a1d31c0b9b5c4..db465e643acbb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ServiceUrlQuarantineTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ServiceUrlQuarantineTest.java @@ -54,18 +54,21 @@ public class ServiceUrlQuarantineTest extends ProducerConsumerBase { private PulsarClientImpl pulsarClientWithHttpServiceUrlDisableQuarantine; private int brokerServicePort; private int webServicePort; - private final Set lockedFreePortSet = new HashSet<>(); + private final Set reservedPorts = new HashSet<>(); private static final int UNAVAILABLE_NODES = 20; private static final int TIMEOUT_MS = 500; @BeforeClass(alwaysRun = true) @Override protected void setup() throws Exception { + // Pre-allocate ports for the broker because they must match the URL the test builds below. this.brokerServicePort = nextLockedFreePort(); this.webServicePort = nextLockedFreePort(); super.internalSetup(); super.producerBaseSetup(); - // Create a Pulsar client with some unavailable nodes + // Build a service URL that includes a bunch of unavailable-node ports. PortManager + // hands out ports outside the ephemeral range, so nothing else is going to grab them + // and the connection attempts to those addresses will fail as expected. StringBuilder binaryServiceUrlBuilder = new StringBuilder(pulsar.getBrokerServiceUrl()); StringBuilder httpServiceUrlBuilder = new StringBuilder(pulsar.getWebServiceAddress()); for (int i = 0; i < UNAVAILABLE_NODES; i++) { @@ -98,9 +101,9 @@ protected void setup() throws Exception { } private int nextLockedFreePort() { - int newLockedFreePort = PortManager.nextLockedFreePort(); - this.lockedFreePortSet.add(newLockedFreePort); - return newLockedFreePort; + int port = PortManager.nextLockedFreePort(); + reservedPorts.add(port); + return port; } @Override @@ -133,9 +136,8 @@ protected void cleanup() throws Exception { if (pulsarClientWithHttpServiceUrlDisableQuarantine != null) { pulsarClientWithHttpServiceUrlDisableQuarantine.close(); } - for (Integer port : lockedFreePortSet) { - PortManager.releaseLockedPort(port); - } + reservedPorts.forEach(PortManager::releaseLockedPort); + reservedPorts.clear(); } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java index 3fbd4564a922c..a39b1ef1b87a3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java @@ -119,7 +119,7 @@ void setup(Method method) throws Exception { log.info().attr("method", method.getName()).log("Setting up method"); // Start local bookkeeper ensemble - bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(3, 0); bkEnsemble.start(); config = new ServiceConfiguration(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java index 122a81d3779a0..b6952682fa114 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java @@ -202,7 +202,7 @@ void setup(Method method) throws Exception { log.info().attr("method", method.getName()).log("Setting up method"); // Start local bookkeeper ensemble - bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(3, 0); bkEnsemble.start(); config = new ServiceConfiguration(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java index 1bbabca36139d..5450c0c17b3ca 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java @@ -124,7 +124,7 @@ void setup(Method method) throws Exception { log.info().attr("method", method.getName()).log("Setting up method"); // Start local bookkeeper ensemble - bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(3, 0); bkEnsemble.start(); config = new ServiceConfiguration(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java index 9f27350f73dbb..9727dfb8f41bc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.functions.worker; -import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; @@ -97,21 +96,22 @@ public class PulsarFunctionTlsTest { void setup() throws Exception { log.info("---- Initializing TopicOwnerTest -----"); // Start local bookkeeper ensemble - bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(3, 0); bkEnsemble.start(); // start brokers for (int i = 0; i < BROKER_COUNT; i++) { - int brokerPort = nextLockedFreePort(); - int webPort = nextLockedFreePort(); - ServiceConfiguration config = new ServiceConfiguration(); config.setBrokerShutdownTimeoutMs(0L); config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setWebServicePort(Optional.empty()); - config.setWebServicePortTls(Optional.of(webPort)); + // Pre-allocate the TLS web port: PulsarService.initializeWorkerConfigFromBrokerConfig + // builds workerId = "c-{cluster}-fw-{host}-{port}" from the CONFIGURED port. Two + // brokers configured with port 0 would end up with the same workerId and the + // function-worker membership manager would never elect a leader. + config.setWebServicePortTls(Optional.of(PortManager.nextLockedFreePort())); config.setBrokerServicePort(Optional.empty()); - config.setBrokerServicePortTls(Optional.of(brokerPort)); + config.setBrokerServicePortTls(Optional.of(0)); config.setClusterName("my-cluster"); config.setAdvertisedAddress("localhost"); config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort()); @@ -220,11 +220,9 @@ void tearDown() throws Exception { } for (int i = 0; i < BROKER_COUNT; i++) { if (pulsarServices[i] != null) { - pulsarServices[i].close(); - pulsarServices[i].getConfiguration(). - getBrokerServicePort().ifPresent(PortManager::releaseLockedPort); pulsarServices[i].getConfiguration() - .getWebServicePort().ifPresent(PortManager::releaseLockedPort); + .getWebServicePortTls().ifPresent(PortManager::releaseLockedPort); + pulsarServices[i].close(); pulsarServices[i] = null; } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java index 7766b8a5054b4..ae91b671643f0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java @@ -83,7 +83,7 @@ void setup(Method method) throws Exception { log.info().attr("method", method.getName()).log("Setting up method"); // Start local bookkeeper ensemble - bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(3, 0); bkEnsemble.start(); config = new ServiceConfiguration(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java index cc010b82d9b90..53705e13be689 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java @@ -114,7 +114,7 @@ public void setup(Method method) throws Exception { log.info().attr("method", method.getName()).log("--- Setting up method ---"); // Start local bookkeeper ensemble - bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(3, 0); bkEnsemble.start(); config = new ServiceConfiguration(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java index 9e4b12f0a46b2..2bda831a999df 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java @@ -93,7 +93,7 @@ void setup(Method method) throws Exception { log.info().attr("method", method.getName()).log("Setting up method"); // Start local bookkeeper ensemble - bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(3, 0); bkEnsemble.start(); config = new ServiceConfiguration(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java index aa78d3aaa916b..f121566d03787 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java @@ -110,7 +110,7 @@ void setup(Method method) throws Exception { log.info().attr("method", method.getName()).log("Setting up method"); // Start local bookkeeper ensemble - bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(3, 0); bkEnsemble.start(); config = new ServiceConfiguration(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsembleTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsembleTest.java index 3bc42cf03682d..65ad1e490c0ff 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsembleTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsembleTest.java @@ -42,7 +42,7 @@ public void testStartStop() throws Exception { final int numBk = 1; // Start local Bookies/ZooKeepers and confirm that they are running at specified ports - LocalBookkeeperEnsemble ensemble = new LocalBookkeeperEnsemble(numBk, 0, () -> 0); + LocalBookkeeperEnsemble ensemble = new LocalBookkeeperEnsemble(numBk, 0); ensemble.start(); assertTrue(ensemble.getZkServer().isRunning()); assertEquals(ensemble.getZkServer().getClientPort(), ensemble.getZookeeperPort()); @@ -60,8 +60,7 @@ public void testStartStop() throws Exception { public void testStartWithSpecifiedStreamStoragePort() throws Exception { LocalBookkeeperEnsemble ensemble = null; try { - ensemble = - new LocalBookkeeperEnsemble(1, 0, 0, 4182, null, null, true, null); + ensemble = new LocalBookkeeperEnsemble(1, 0, 4182, null, null, true, null); ensemble.startStandalone(new ServerConfiguration(), true); } finally { if (ensemble != null) { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/PortManager.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/PortManager.java index 612deb9e01727..fab5104e8c39c 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/PortManager.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/PortManager.java @@ -22,14 +22,18 @@ import java.util.HashSet; import java.util.Set; +/** + * Allocates ports for tests that need a known port number BEFORE binding (e.g. tests that build + * advertised-listener URLs, or that pre-create metadata znodes at the broker's would-be address). + * For everything else, prefer binding to port 0 and reading the kernel-assigned port back. + */ public class PortManager { private static final Set PORTS = new HashSet<>(); /** - * Return a locked available port. - * - * @return locked available port. + * Return a free port that is reserved for the caller until {@link #releaseLockedPort(int)} + * is invoked. */ public static synchronized int nextLockedFreePort() { int exceptionCount = 0; @@ -50,18 +54,16 @@ public static synchronized int nextLockedFreePort() { } /** - * Returns whether the port was released successfully. + * Release a previously locked port. * - * @return whether the release is successful. + * @return true if the port was previously locked by this manager */ public static synchronized boolean releaseLockedPort(int lockedPort) { return PORTS.remove(lockedPort); } /** - * Check port if locked. - * - * @return whether the port is locked. + * @return true if the port is currently locked by this manager */ public static synchronized boolean checkPortIfLocked(int lockedPort) { return PORTS.contains(lockedPort); diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/PortManagerTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/PortManagerTest.java index 408753300cc0d..b3f91617a5c1a 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/PortManagerTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/PortManagerTest.java @@ -18,19 +18,41 @@ */ package org.apache.pulsar.common.util; -import static org.apache.pulsar.common.util.PortManager.checkPortIfLocked; -import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort; -import static org.apache.pulsar.common.util.PortManager.releaseLockedPort; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertTrue; import org.testng.annotations.Test; public class PortManagerTest { + + @Test + public void allocatesAFreePort() { + int port = PortManager.nextLockedFreePort(); + try { + assertTrue(port > 0); + assertTrue(PortManager.checkPortIfLocked(port)); + } finally { + PortManager.releaseLockedPort(port); + } + } + + @Test + public void allocatesDistinctPorts() { + int p1 = PortManager.nextLockedFreePort(); + int p2 = PortManager.nextLockedFreePort(); + try { + assertNotEquals(p1, p2); + } finally { + PortManager.releaseLockedPort(p1); + PortManager.releaseLockedPort(p2); + } + } + @Test - public void testCheckPortIfLockedAndRemove() { - int port = nextLockedFreePort(); - assertTrue(checkPortIfLocked(port)); - assertTrue(releaseLockedPort(port)); - assertFalse(checkPortIfLocked(port)); + public void releasingMarksPortAsUnlocked() { + int port = PortManager.nextLockedFreePort(); + assertTrue(PortManager.checkPortIfLocked(port)); + assertTrue(PortManager.releaseLockedPort(port)); + assertFalse(PortManager.checkPortIfLocked(port)); } } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java index 9ec10326c26c7..aafe4889cccbe 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java @@ -31,12 +31,9 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import lombok.CustomLog; import lombok.Getter; import org.apache.bookkeeper.bookie.BookieImpl; -import org.apache.bookkeeper.bookie.Cookie; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.common.allocator.PoolingPolicy; import org.apache.bookkeeper.common.component.Lifecycle; @@ -50,7 +47,6 @@ import org.apache.bookkeeper.server.conf.BookieConfiguration; import org.apache.bookkeeper.util.IOUtils; import org.apache.commons.io.FileUtils; -import org.apache.pulsar.common.util.PortManager; import org.apache.pulsar.metadata.api.MetadataStoreConfig; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; @@ -74,7 +70,6 @@ public class BKCluster implements AutoCloseable { protected final ServerConfiguration baseConf; protected final ClientConfiguration baseClientConf; - private final List lockedPorts = new ArrayList<>(); private final AtomicBoolean closed = new AtomicBoolean(false); public static class BKClusterConf { @@ -83,7 +78,6 @@ public static class BKClusterConf { private String metadataServiceUri; private int numBookies = 1; private String dataDir; - private int bkPort = 0; private boolean clearOldData; @@ -107,11 +101,6 @@ public BKClusterConf dataDir(String dataDir) { return this; } - public BKClusterConf bkPort(int bkPort) { - this.bkPort = bkPort; - return this; - } - public BKClusterConf clearOldData(boolean clearOldData) { this.clearOldData = clearOldData; return this; @@ -163,8 +152,6 @@ public void close() throws Exception { } catch (Exception e) { log.error().exception(e).log("Got Exception while trying to stop BKCluster"); } - lockedPorts.forEach(PortManager::releaseLockedPort); - lockedPorts.clear(); // cleanup temp dirs try { cleanupTempDirs(); @@ -240,40 +227,23 @@ private ServerConfiguration newServerConfiguration(int index) throws Exception { cleanDirectory(dataDir); } - int port; - if (baseConf.isEnableLocalTransport() || !baseConf.getAllowEphemeralPorts() || clusterConf.bkPort == 0) { - port = PortManager.nextLockedFreePort(); - lockedPorts.add(port); - } else { - // bk 4.15 cookie validation finds the same ip:port in case of port 0 - // and 2nd bookie's cookie validation fails - port = clusterConf.bkPort; - } - File[] cookieDir = dataDir.listFiles((file) -> file.getName().equals("current")); - if (cookieDir != null && cookieDir.length > 0) { - String existBookieAddr = parseBookieAddressFromCookie(cookieDir[0]); - if (existBookieAddr != null) { - baseConf.setAdvertisedAddress(existBookieAddr.split(":")[0]); - port = Integer.parseInt(existBookieAddr.split(":")[1]); - } - } - return newServerConfiguration(port, dataDir, new File[]{dataDir}); - } - - private String parseBookieAddressFromCookie(File dir) throws IOException { - Cookie cookie = Cookie.readFromDirectory(dir); - Pattern pattern = Pattern.compile(".*bookieHost: \"(.*?)\".*", Pattern.DOTALL); - Matcher m = pattern.matcher(cookie.toString()); - return m.find() ? m.group(1) : null; + // Bookies bind to a kernel-assigned port. Identity is established via a `bookieId` + // derived from the data dir path: bookies with the same dir (e.g. on cluster restart) + // get the same id so cookies match, while bookies with different dirs (e.g. separate + // test runs sharing the same metadata store) get different ids and don't collide. + String bookieId = "bk-" + index + "-" + Integer.toHexString(dataDir.getAbsolutePath().hashCode()); + return newServerConfiguration(0, bookieId, dataDir, new File[]{dataDir}); } private ClientConfiguration newClientConfiguration() { return new ClientConfiguration(baseConf); } - private ServerConfiguration newServerConfiguration(int port, File journalDir, File[] ledgerDirs) { + private ServerConfiguration newServerConfiguration(int port, String bookieId, File journalDir, + File[] ledgerDirs) { ServerConfiguration conf = new ServerConfiguration(baseConf); conf.setBookiePort(port); + conf.setBookieId(bookieId); conf.setJournalDirName(journalDir.getPath()); String[] ledgerDirNames = new String[ledgerDirs.length]; for (int i = 0; i < ledgerDirs.length; i++) { @@ -373,7 +343,7 @@ private ServerConfiguration newBaseServerConfiguration() { confReturn.setAllowEphemeralPorts(true); confReturn.setJournalWriteData(false); confReturn.setProperty("journalPreAllocSizeMB", 1); - confReturn.setBookiePort(clusterConf.bkPort); + confReturn.setBookiePort(0); confReturn.setGcWaitTime(1000L); confReturn.setDiskUsageThreshold(0.999F); confReturn.setDiskUsageWarnThreshold(0.99F); diff --git a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookKeeperClusterTestCase.java b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookKeeperClusterTestCase.java index 6159741f53577..612a05fba7ff0 100644 --- a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookKeeperClusterTestCase.java +++ b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookKeeperClusterTestCase.java @@ -25,7 +25,6 @@ package org.apache.bookkeeper.replication; import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE; -import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort; import static org.testng.Assert.assertFalse; import com.google.common.base.Stopwatch; import com.google.common.util.concurrent.MoreExecutors; @@ -316,14 +315,10 @@ protected void stopBKCluster() throws Exception { protected ServerConfiguration newServerConfiguration() throws Exception { File f = tmpDirs.createNew("bookie", "test"); - - int port; - if (baseConf.isEnableLocalTransport() || !baseConf.getAllowEphemeralPorts()) { - port = nextLockedFreePort(); - } else { - port = 0; - } - return newServerConfiguration(port, f, new File[] { f }); + // Bookies need a pre-allocated port: BK identifies them by host:port in metadata + // and the test client resolves that back to a TCP address. Port 0 would leave + // the cookie + registration with port=0, which fails DNS-style resolution. + return newServerConfiguration(PortManager.nextLockedFreePort(), f, new File[] { f }); } protected ClientConfiguration newClientConfiguration() { diff --git a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java index 3abd3580e860b..33080e42832bc 100644 --- a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java +++ b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java @@ -28,7 +28,6 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_INDEX_SCOPE; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_LEDGER_SCOPE; import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE; -import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort; import static org.testng.Assert.assertFalse; import com.google.common.base.Stopwatch; import java.io.File; @@ -291,14 +290,10 @@ protected void stopBKCluster() throws Exception { protected ServerConfiguration newServerConfiguration() throws Exception { File f = tmpDirs.createNew("bookie", "test"); - - int port; - if (baseConf.isEnableLocalTransport() || !baseConf.getAllowEphemeralPorts()) { - port = nextLockedFreePort(); - } else { - port = 0; - } - return newServerConfiguration(port, f, new File[] { f }); + // Bookies need a pre-allocated port: BK identifies them by host:port in metadata + // and the test client resolves that back to a TCP address. Port 0 would leave + // the cookie + registration with port=0, which fails DNS-style resolution. + return newServerConfiguration(PortManager.nextLockedFreePort(), f, new File[] { f }); } protected ClientConfiguration newClientConfiguration() { diff --git a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/PortManager.java b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/PortManager.java deleted file mode 100644 index c5ab8dd629ce6..0000000000000 --- a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/PortManager.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.packages.management.storage.bookkeeper.bookkeeper.test; - -import java.io.IOException; -import java.net.ConnectException; -import java.net.Inet4Address; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.nio.channels.FileLock; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.nio.file.StandardOpenOption; -import lombok.Cleanup; - -/** - * Port manager allows a base port to be specified on the commandline. Tests will then use ports, counting up from this - * base port. This allows multiple instances of the bookkeeper tests to run at once. - */ -public class PortManager { - - private static final String lockFilename = System.getProperty("test.lockFilename", - "/tmp/pulsar-test-port-manager.lock"); - private static final int BASE_PORT = Integer.parseInt(System.getProperty("test.basePort", "15000")); - - private static final int MAX_PORT = 32000; - - /** - * Return a TCP port that is currently unused. - * - * Keeps track of assigned ports and avoid race condition between different processes - */ - public static synchronized int nextFreePort() { - Path path = Paths.get(lockFilename); - - try { - @Cleanup - FileChannel fileChannel = FileChannel.open(path, - StandardOpenOption.CREATE, - StandardOpenOption.WRITE, - StandardOpenOption.READ); - - @Cleanup - FileLock lock = fileChannel.lock(); - - ByteBuffer buffer = ByteBuffer.allocate(32); - int len = fileChannel.read(buffer, 0L); - buffer.flip(); - - int lastUsedPort = BASE_PORT; - if (len > 0) { - byte[] bytes = new byte[buffer.remaining()]; - buffer.get(bytes); - String lastUsedPortStr = new String(bytes); - lastUsedPort = Integer.parseInt(lastUsedPortStr); - } - - int freePort = probeFreePort(lastUsedPort + 1); - - buffer.clear(); - buffer.put(Integer.toString(freePort).getBytes()); - buffer.flip(); - fileChannel.write(buffer, 0L); - fileChannel.truncate(buffer.position()); - fileChannel.force(true); - - return freePort; - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - private static final int MAX_PORT_CONFLICTS = 10; - - private static synchronized int probeFreePort(int port) { - int exceptionCount = 0; - while (true) { - if (port == MAX_PORT) { - // Rollover the port probe - port = BASE_PORT; - } - - try (Socket s = new Socket()) { - s.connect(new InetSocketAddress(Inet4Address.getLoopbackAddress(), port), 100); - - // If we succeed to connect it means the port is being used - - } catch (ConnectException e) { - return port; - } catch (Exception e) { - e.printStackTrace(); - } - - port++; - exceptionCount++; - if (exceptionCount > MAX_PORT_CONFLICTS) { - throw new RuntimeException("Failed to find an open port"); - } - } - } - - public static void main(String[] args) throws Exception { - while (true) { - System.out.println("Port: " + nextFreePort()); - Thread.sleep(100); - } - } -} diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java index d6eaf5dcc66e6..c834e09bd8925 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java @@ -19,6 +19,7 @@ package org.apache.pulsar.proxy.extensions; import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort; +import static org.apache.pulsar.common.util.PortManager.releaseLockedPort; import static org.mockito.Mockito.doReturn; import static org.testng.Assert.assertEquals; import io.netty.buffer.ByteBuf; @@ -50,7 +51,6 @@ import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; -import org.apache.pulsar.common.util.PortManager; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.apache.pulsar.proxy.server.ProxyConfiguration; import org.apache.pulsar.proxy.server.ProxyService; @@ -90,6 +90,8 @@ public void start(ProxyService service) { @Override public Map> newChannelInitializers() { + // Pre-allocate a free port: extension handlers need to register a listener at a known + // address before the proxy calls back into them. int port = nextLockedFreePort(); this.ports.add(port); return Collections.singletonMap(new InetSocketAddress(conf.getBindAddress(), port), @@ -117,7 +119,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { @Override public void close() { - ports.removeIf(PortManager::releaseLockedPort); + ports.removeIf(p -> releaseLockedPort(p)); } }