From 0383a8f75624601fa67f706d5f39a03268e3e17a Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Tue, 5 May 2026 18:36:36 -0700 Subject: [PATCH 1/7] [cleanup][test] Remove PortManager and use kernel-assigned ports everywhere MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `PortManager` (in pulsar-common) and the inner `BasePortManager` of `LocalBookkeeperEnsemble` allocated "free" ports via `ServerSocket(0)` and tracked them in a JVM-level lock set. The lock was JVM-only, so the OS-level port could still be grabbed by another process between allocation and bind, producing flaky `BindException`s in tests. The class has outlived its usefulness — modern BookKeeper identifies bookies by `bookieId`, not host:port, so port 0 (kernel-assigned) is fine across the board. Removed: - `pulsar-common/...PortManager.java` and its test. - `pulsar-package-management/...test/PortManager.java` (unused duplicate). - `BasePortManager` inner class in `LocalBookkeeperEnsemble`. - `bkBasePort` constructor variants of `LocalBookkeeperEnsemble` (no callers left after the standalone change). - `--bookkeeper-port` CLI option from `PulsarStandalone` (and its setter, getter, builder method, plus 3 test calls). Multi-bookie standalone now always uses kernel-assigned ports — bookies are identified by `bookieId`. - `useDynamicBrokerPorts()` toggle in `MultiBrokerBaseTest` (now always true in spirit; ports come from `getBrokerListenPort()` after start). - `bkPort` field in `BKCluster.BKClusterConf`. Refactored: - `BKCluster`: bookies bind to port 0; `bookieId` is derived from the data dir hash so multiple cluster instances on a shared metadata store don't collide on cookie validation, while restarts on the same dir keep cookies stable. The unused cookie-parsing port-recovery logic is dropped. - `MultiBrokerBaseTest`: `mainBrokerPort` and `additionalBrokerPorts` are populated post-start from `pulsar.getBrokerListenPort()` instead of pre-allocated. - `PulsarTestContext.handlePreallocatePorts`: inlined `ServerSocket(0)` allocation. Still needed by tests that build advertised-listener URLs before the broker starts. - 17 test files: `PortManager.nextLockedFreePort()` calls replaced with `0` (let kernel pick) or inline `ServerSocket(0)` (only where pre-allocation is genuinely needed, e.g. for advertised-listener URLs). - `PulsarStandaloneTest.testStandaloneWithRocksDB`: assertion switched from `getBookiePort()` (now returns kernel-assigned ports that change per restart) to `getBookieId()` (the persistent identity). Behavior change: `--bookkeeper-port` is gone from `pulsar standalone`. Bookies get kernel-assigned ports — no impact for normal usage since clients only connect to the broker. --- .../mledger/impl/ManagedLedgerBkTest.java | 4 +- .../test/BookKeeperClusterTestCase.java | 13 +- .../org/apache/pulsar/PulsarStandalone.java | 18 +-- .../pulsar/PulsarStandaloneBuilder.java | 5 - .../zookeeper/LocalBookkeeperEnsemble.java | 37 +---- .../apache/pulsar/PulsarStandaloneTest.java | 7 +- .../pulsar/broker/MultiBrokerBaseTest.java | 40 +----- ...stractBrokerEntryCacheMultiBrokerTest.java | 5 - .../loadbalance/AdvertisedListenersTest.java | 20 ++- ...anagerImplWithAdvertisedListenersTest.java | 18 ++- .../extensions/LoadManagerFailFastTest.java | 6 +- .../impl/ModularLoadManagerImplTest.java | 10 +- .../PulsarClientBasedHandlerTest.java | 6 +- .../SimpleProtocolHandlerTestsBase.java | 16 ++- .../persistent/ShadowTopicRealBkTest.java | 7 +- .../broker/testcontext/PulsarTestContext.java | 28 ++-- .../client/impl/ServiceUrlQuarantineTest.java | 30 +++-- .../worker/PulsarFunctionTlsTest.java | 13 +- .../LocalBookkeeperEnsembleTest.java | 3 +- .../pulsar/common/util/PortManager.java | 69 ---------- .../pulsar/common/util/PortManagerTest.java | 36 ----- .../pulsar/metadata/bookkeeper/BKCluster.java | 50 ++----- .../BookKeeperClusterTestCase.java | 13 +- .../test/BookKeeperClusterTestCase.java | 13 +- .../bookkeeper/test/PortManager.java | 126 ------------------ .../SimpleProxyExtensionTestBase.java | 16 ++- 26 files changed, 140 insertions(+), 469 deletions(-) delete mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/util/PortManager.java delete mode 100644 pulsar-common/src/test/java/org/apache/pulsar/common/util/PortManagerTest.java delete mode 100644 pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/PortManager.java diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java index 236f596c83c86..23ad9af3ad04d 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java @@ -18,7 +18,6 @@ */ package org.apache.bookkeeper.mledger.impl; -import static org.apache.pulsar.common.util.PortManager.releaseLockedPort; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotNull; @@ -134,7 +133,7 @@ public void testBookieFailure() throws Exception { metadataStore.unsetAlwaysFail(); bkc = new PulsarBookKeeperTestClient(baseClientConf); - int port = startNewBookie(); + startNewBookie(); // Reconnect a new bk client factory.shutdown(); @@ -164,7 +163,6 @@ public void testBookieFailure() throws Exception { assertEquals("entry-2", new String(entries.get(0).getData())); entries.forEach(Entry::release); factory.shutdown(); - releaseLockedPort(port); } @Test diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java index 78891cdd7a473..378f7720d5f54 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java @@ -25,7 +25,6 @@ package org.apache.bookkeeper.test; import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE; -import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort; import static org.testng.Assert.assertFalse; import com.google.common.base.Stopwatch; import java.io.File; @@ -65,7 +64,6 @@ import org.apache.bookkeeper.proto.BookieServer; import org.apache.bookkeeper.replication.Auditor; import org.apache.bookkeeper.replication.ReplicationWorker; -import org.apache.pulsar.common.util.PortManager; import org.apache.pulsar.metadata.api.MetadataStoreConfig; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore; @@ -291,19 +289,12 @@ protected void stopBKCluster() throws Exception { t.shutdown(); } servers.clear(); - bookiePorts.removeIf(PortManager::releaseLockedPort); + bookiePorts.clear(); } protected ServerConfiguration newServerConfiguration() throws Exception { File f = tmpDirs.createNew("bookie", "test"); - - int port; - if (baseConf.isEnableLocalTransport() || !baseConf.getAllowEphemeralPorts()) { - port = nextLockedFreePort(); - } else { - port = 0; - } - return newServerConfiguration(port, f, new File[] { f }); + return newServerConfiguration(0, f, new File[] { f }); } protected ClientConfiguration newClientConfiguration() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java index 0f4166be92213..d6f6a4399f418 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java @@ -84,10 +84,6 @@ public void setBkEnsemble(LocalBookkeeperEnsemble bkEnsemble) { this.bkEnsemble = bkEnsemble; } - public void setBkPort(int bkPort) { - this.bkPort = bkPort; - } - public void setBkDir(String bkDir) { this.bkDir = bkDir; } @@ -172,10 +168,6 @@ public int getZkPort() { return zkPort; } - public int getBkPort() { - return bkPort; - } - public String getZkDir() { return zkDir; } @@ -237,9 +229,6 @@ public boolean isHelp() { hidden = true) private int zkPort = 2181; - @Option(names = { "--bookkeeper-port" }, description = "Local bookies base port") - private int bkPort = 3181; - @Option(names = { "--zookeeper-dir" }, description = "Local zooKeeper's data directory", hidden = true) @@ -470,7 +459,6 @@ void startBookieWithMetadataStore() throws Exception { bkCluster = BKCluster.builder() .baseServerConfiguration(bkServerConf) .metadataServiceUri(metadataStoreUrl) - .bkPort(bkPort) .numBookies(numOfBk) .dataDir(bkDir) .clearOldData(wipeData) @@ -484,10 +472,10 @@ private void startBookieWithZookeeper() throws Exception { ServerConfiguration bkServerConf = new ServerConfiguration(); bkServerConf.loadConf(new File(configFile).toURI().toURL()); calculateCacheSize(bkServerConf); - // Start LocalBookKeeper + // Start LocalBookKeeper. Bookies bind to kernel-assigned ports (() -> 0). bkEnsemble = new LocalBookkeeperEnsemble( - this.getNumOfBk(), this.getZkPort(), this.getBkPort(), this.getStreamStoragePort(), this.getZkDir(), - this.getBkDir(), this.isWipeData(), "127.0.0.1"); + this.getNumOfBk(), this.getZkPort(), this.getStreamStoragePort(), this.getZkDir(), + this.getBkDir(), this.isWipeData(), "127.0.0.1", () -> 0); bkEnsemble.startStandalone(bkServerConf, !this.isNoStreamStorage()); config.setMetadataStoreUrl("zk:127.0.0.1:" + zkPort); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneBuilder.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneBuilder.java index 5469d24782958..2cf91c564d5cf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneBuilder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneBuilder.java @@ -56,11 +56,6 @@ public PulsarStandaloneBuilder withZkPort(int zkPort) { return this; } - public PulsarStandaloneBuilder withBkPort(int bkPort) { - pulsarStandalone.setBkPort(bkPort); - return this; - } - public PulsarStandaloneBuilder withZkDir(String zkDir) { pulsarStandalone.setZkDir(zkDir); return this; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java b/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java index b3c2b8de5bdef..262b4a7b9b9db 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java @@ -91,46 +91,23 @@ public class LocalBookkeeperEnsemble { int numberOfBookies; private final boolean clearOldData; - private static class BasePortManager implements Supplier { - - private int port; - - public BasePortManager(int basePort) { - this.port = basePort; - } - - @Override - public synchronized Integer get() { - return port++; - } - } - + // Supplier of bookie ports. Tests typically use `() -> 0` so the kernel picks a free port at + // bind time. Bookies are identified by `bookieId`, not host:port, so multiple ephemeral + // bookies coexist fine. private final Supplier portManager; public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, Supplier portManager) { this(numberOfBookies, zkPort, 4181, null, null, true, null, portManager); } - public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, int bkBasePort, String zkDataDirName, + public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, String zkDataDirName, String bkDataDirName, boolean clearOldData) { - this(numberOfBookies, zkPort, bkBasePort, 4181, zkDataDirName, bkDataDirName, clearOldData, null); + this(numberOfBookies, zkPort, 4181, zkDataDirName, bkDataDirName, clearOldData, null, () -> 0); } - public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, int bkBasePort, String zkDataDirName, + public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, String zkDataDirName, String bkDataDirName, boolean clearOldData, String advertisedAddress) { - this(numberOfBookies, zkPort, bkBasePort, 4181, zkDataDirName, bkDataDirName, clearOldData, advertisedAddress); - } - - public LocalBookkeeperEnsemble(int numberOfBookies, - int zkPort, - int bkBasePort, - int streamStoragePort, - String zkDataDirName, - String bkDataDirName, - boolean clearOldData, - String advertisedAddress) { - this(numberOfBookies, zkPort, streamStoragePort, zkDataDirName, bkDataDirName, clearOldData, advertisedAddress, - bkBasePort != 0 ? new BasePortManager(bkBasePort) : () -> 0); + this(numberOfBookies, zkPort, 4181, zkDataDirName, bkDataDirName, clearOldData, advertisedAddress, () -> 0); } public LocalBookkeeperEnsemble(int numberOfBookies, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/PulsarStandaloneTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/PulsarStandaloneTest.java index 77c1aced14df2..21b4fe43de268 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/PulsarStandaloneTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/PulsarStandaloneTest.java @@ -54,7 +54,6 @@ public void testStandaloneWithRocksDB() throws Exception { PulsarStandaloneStarter standalone = new PulsarStandaloneStarter(args); standalone.setBkDir(tempDir.getAbsolutePath()); - standalone.setBkPort(0); standalone.setNumOfBk(bookieNum); standalone.startBookieWithMetadataStore(); @@ -67,10 +66,12 @@ public void testStandaloneWithRocksDB() throws Exception { List secondBsConfs = standalone.bkCluster.getBsConfs(); Assert.assertEquals(secondBsConfs.size(), bookieNum); + // Cookies must be preserved across restart (otherwise bookie startup would have failed + // with InvalidCookieException). The bookieId is the persistent identity. for (int i = 0; i < bookieNum; i++) { ServerConfiguration conf1 = firstBsConfs.get(i); ServerConfiguration conf2 = secondBsConfs.get(i); - Assert.assertEquals(conf1.getBookiePort(), conf2.getBookiePort()); + Assert.assertEquals(conf1.getBookieId(), conf2.getBookieId()); } standalone.close(); cleanDirectory(tempDir); @@ -93,7 +94,6 @@ public void testMetadataInitialization(boolean enableBrokerClientAuth) throws Ex } final File bkDir = IOUtils.createTempDir("standalone", "bk"); standalone.setNumOfBk(1); - standalone.setBkPort(0); standalone.setBkDir(bkDir.getAbsolutePath()); standalone.start(); @@ -148,7 +148,6 @@ public void testShutdownHookClosesBkCluster() throws Exception { bkDir.getAbsolutePath() }); standalone.setTestMode(true); - standalone.setBkPort(0); standalone.start(); BKCluster bkCluster = standalone.bkCluster; standalone.runShutdownHook(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java index 076d649d5d983..203b8668288a1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java @@ -22,7 +22,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Optional; import lombok.CustomLog; import org.apache.bookkeeper.client.PulsarMockBookKeeper; import org.apache.bookkeeper.common.util.OrderedExecutor; @@ -35,7 +34,6 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown; -import org.apache.pulsar.common.util.PortManager; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -46,6 +44,7 @@ public abstract class MultiBrokerBaseTest extends MockedPulsarServiceBaseTest { protected List additionalBrokerAdmins; protected List additionalBrokerClients; protected PulsarMockBookKeeper mockBookKeeper; + // Populated after broker startup with kernel-assigned ports. protected int mainBrokerPort; protected List additionalBrokerPorts = new ArrayList<>(); @@ -53,17 +52,10 @@ protected int numberOfAdditionalBrokers() { return 2; } - protected boolean useDynamicBrokerPorts() { - return true; - } - @BeforeClass(alwaysRun = true) @Override public final void setup() throws Exception { beforeSetup(); - if (!useDynamicBrokerPorts()) { - mainBrokerPort = PortManager.nextLockedFreePort(); - } OrderedExecutor mockBookKeeperExecutor = OrderedExecutor.newBuilder().numThreads(1) .name(MultiBrokerBaseTest.class.getSimpleName() + "-bk-executor").build(); registerCloseable(() -> GracefulExecutorServicesShutdown.initiate() @@ -75,6 +67,7 @@ public final void setup() throws Exception { ((NonClosableMockBookKeeper) mockBookKeeper).reallyShutdown(); }); super.internalSetup(); + mainBrokerPort = pulsar.getBrokerListenPort().orElse(0); additionalBrokersSetup(); pulsarResourcesSetup(); additionalSetup(); @@ -89,14 +82,6 @@ protected void customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder p pulsarTestContextBuilder.bookKeeperClient(mockBookKeeper); } - @Override - protected void doInitConf() throws Exception { - super.doInitConf(); - if (!useDynamicBrokerPorts()) { - conf.setBrokerServicePort(Optional.of(mainBrokerPort)); - } - } - protected void additionalSetup() throws Exception { // override this method to add any additional setup logic @@ -115,17 +100,12 @@ protected void additionalBrokersSetup() throws Exception { additionalBrokerClients = new ArrayList<>(numberOfAdditionalBrokers); additionalPulsarTestContexts = new ArrayList<>(numberOfAdditionalBrokers); additionalBrokerPorts = new ArrayList<>(numberOfAdditionalBrokers); - if (!useDynamicBrokerPorts()) { - for (int i = 0; i < numberOfAdditionalBrokers; i++) { - int port = PortManager.nextLockedFreePort(); - additionalBrokerPorts.add(port); - } - } for (int i = 0; i < numberOfAdditionalBrokers; i++) { PulsarTestContext pulsarTestContext = createAdditionalBroker(i); additionalPulsarTestContexts.add(i, pulsarTestContext); PulsarService pulsarService = pulsarTestContext.getPulsarService(); additionalBrokers.add(i, pulsarService); + additionalBrokerPorts.add(pulsarService.getBrokerListenPort().orElse(0)); PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder().serviceHttpUrl(pulsarService.getWebServiceAddress() != null ? pulsarService.getWebServiceAddress() @@ -159,9 +139,6 @@ protected ServiceConfiguration createConfForAdditionalBroker(int additionalBroke protected PulsarTestContext createAdditionalBroker(int additionalBrokerIndex) throws Exception { ServiceConfiguration conf = createConfForAdditionalBroker(additionalBrokerIndex); - if (!useDynamicBrokerPorts()) { - conf.setBrokerServicePort(Optional.of(additionalBrokerPorts.get(additionalBrokerIndex))); - } return createAdditionalPulsarTestContext(conf); } @@ -175,14 +152,6 @@ public final void cleanup() throws Exception { log.warn().exception(e).log("Exception during additional cleanup"); } super.internalCleanup(); - if (!useDynamicBrokerPorts()) { - if (mainBrokerPort > 0) { - PortManager.releaseLockedPort(mainBrokerPort); - } - for (Integer port : additionalBrokerPorts) { - PortManager.releaseLockedPort(port); - } - } } protected void additionalCleanup() throws Exception { @@ -212,9 +181,6 @@ protected void additionalBrokersCleanup() { try { pulsarService.getConfiguration().setBrokerShutdownTimeoutMs(0L); pulsarTestContext.close(); - pulsarService.getConfiguration().getBrokerServicePort().ifPresent(PortManager::releaseLockedPort); - pulsarService.getConfiguration().getWebServicePort().ifPresent(PortManager::releaseLockedPort); - pulsarService.getConfiguration().getWebServicePortTls().ifPresent(PortManager::releaseLockedPort); } catch (Exception e) { log.warn().exception(e).log("Failed to stop additional broker"); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/AbstractBrokerEntryCacheMultiBrokerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/AbstractBrokerEntryCacheMultiBrokerTest.java index 48cc5c6799394..3e2b0a4655c4b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/AbstractBrokerEntryCacheMultiBrokerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/AbstractBrokerEntryCacheMultiBrokerTest.java @@ -197,11 +197,6 @@ protected int numberOfAdditionalBrokers() { return 1; } - @Override - protected boolean useDynamicBrokerPorts() { - return false; - } - @BeforeMethod(alwaysRun = true) public final void doBeforeMethod() { beforeMethod(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java index d019af35a6928..ba0edcf8dbb19 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java @@ -18,9 +18,11 @@ */ package org.apache.pulsar.broker.loadbalance; -import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.ServerSocket; import java.net.URI; import java.util.Optional; import lombok.Cleanup; @@ -65,9 +67,11 @@ protected ServiceConfiguration createConfForAdditionalBroker(int additionalBroke } private void updateConfig(ServiceConfiguration conf, String advertisedAddress) { - int pulsarPort = nextLockedFreePort(); - int httpPort = nextLockedFreePort(); - int httpsPort = nextLockedFreePort(); + // Pre-allocate ports because the advertised listener URLs are baked into config + // before the broker starts. The broker then binds to the same ports. + int pulsarPort = allocateFreePort(); + int httpPort = allocateFreePort(); + int httpsPort = allocateFreePort(); // Use invalid domain name as identifier and instead make sure the advertised listeners work as intended conf.setAdvertisedAddress(advertisedAddress); @@ -79,6 +83,14 @@ private void updateConfig(ServiceConfiguration conf, String advertisedAddress) { conf.setWebServicePort(Optional.of(httpPort)); } + private static int allocateFreePort() { + try (ServerSocket socket = new ServerSocket(0)) { + return socket.getLocalPort(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + @Test public void testLookup() throws Exception { HttpGet request = diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithAdvertisedListenersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithAdvertisedListenersTest.java index f393e585896a2..ec4fedfd3a9bd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithAdvertisedListenersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithAdvertisedListenersTest.java @@ -18,7 +18,9 @@ */ package org.apache.pulsar.broker.loadbalance.extensions; -import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.ServerSocket; import java.util.Optional; import lombok.CustomLog; import org.apache.pulsar.broker.ServiceConfiguration; @@ -46,8 +48,10 @@ public ExtensibleLoadManagerImplWithAdvertisedListenersTest(String serviceUnitSt @Override protected ServiceConfiguration updateConfig(ServiceConfiguration conf) { super.updateConfig(conf); - int privatePulsarPort = nextLockedFreePort(); - int publicPulsarPort = nextLockedFreePort(); + // Pre-allocate ports because advertised listener URLs are baked into config + // before the broker starts. + int privatePulsarPort = allocateFreePort(); + int publicPulsarPort = allocateFreePort(); conf.setInternalListenerName("internal"); conf.setBindAddresses("external:pulsar://localhost:" + publicPulsarPort); conf.setAdvertisedListeners( @@ -61,6 +65,14 @@ protected ServiceConfiguration updateConfig(ServiceConfiguration conf) { return conf; } + private static int allocateFreePort() { + try (ServerSocket socket = new ServerSocket(0)) { + return socket.getLocalPort(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + @DataProvider(name = "isPersistentTopicSubscriptionTypeTest") public Object[][] isPersistentTopicSubscriptionTypeTest() { return new Object[][]{ diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerFailFastTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerFailFastTest.java index a400bf733e557..b57c4f72c8124 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerFailFastTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerFailFastTest.java @@ -26,7 +26,6 @@ import org.apache.pulsar.broker.loadbalance.LoadManager; import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel; import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl; -import org.apache.pulsar.common.util.PortManager; import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; import org.awaitility.Awaitility; import org.mockito.Mockito; @@ -38,8 +37,7 @@ public class LoadManagerFailFastTest { private static final String cluster = "test"; - private final int zkPort = PortManager.nextLockedFreePort(); - private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, zkPort, PortManager::nextLockedFreePort); + private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, 0, () -> 0); private final ServiceConfiguration config = new ServiceConfiguration(); @BeforeClass @@ -49,7 +47,7 @@ protected void setup() throws Exception { config.setAdvertisedAddress("localhost"); config.setBrokerServicePort(Optional.of(0)); config.setWebServicePort(Optional.of(0)); - config.setMetadataStoreUrl("zk:localhost:" + zkPort); + config.setMetadataStoreUrl("zk:localhost:" + bk.getZookeeperPort()); } @AfterClass diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java index c3b5a02e5ac01..fcfcabd97c588 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java @@ -88,7 +88,7 @@ import org.apache.pulsar.common.policies.data.ResourceQuota; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.util.ObjectMapperFactory; -import org.apache.pulsar.common.util.PortManager; +import java.net.ServerSocket; import org.apache.pulsar.metadata.api.MetadataCache; import org.apache.pulsar.metadata.api.Notification; import org.apache.pulsar.metadata.api.NotificationType; @@ -947,7 +947,13 @@ public void testOwnBrokerZnodeByMultipleBroker() throws Exception { ServiceConfiguration config = new ServiceConfiguration(); config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName()); config.setClusterName("use"); - config.setWebServicePort(Optional.of(PortManager.nextLockedFreePort())); + // Pre-allocate a port: the test creates a znode at the broker's would-be address before + // starting the broker, so it needs to know the address up front. + int webPort; + try (ServerSocket socket = new ServerSocket(0)) { + webPort = socket.getLocalPort(); + } + config.setWebServicePort(Optional.of(webPort)); config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort()); config.setBrokerShutdownTimeoutMs(0L); config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandlerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandlerTest.java index 411c57c3f12e9..3e8c28d8de7c8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandlerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandlerTest.java @@ -21,7 +21,6 @@ import java.io.File; import java.util.Optional; import lombok.CustomLog; -import org.apache.bookkeeper.util.PortManager; import org.apache.commons.io.FileUtils; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; @@ -38,8 +37,7 @@ public class PulsarClientBasedHandlerTest { private static final String clusterName = "cluster"; private static final int shutdownTimeoutMs = 100; - private final int zkPort = PortManager.nextFreePort(); - private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, zkPort, PortManager::nextFreePort); + private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, 0, () -> 0); private File tempDirectory; private PulsarService pulsar; @@ -51,7 +49,7 @@ public void setup() throws Exception { config.setAdvertisedAddress("localhost"); config.setBrokerServicePort(Optional.of(0)); config.setWebServicePort(Optional.of(0)); - config.setMetadataStoreUrl("zk:127.0.0.1:" + zkPort); + config.setMetadataStoreUrl("zk:127.0.0.1:" + bk.getZookeeperPort()); tempDirectory = SimpleProtocolHandlerTestsBase.configureProtocolHandler(config, PulsarClientBasedHandler.class.getName(), true); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java index 9e32f4be8be41..cf4a329f5cd5f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java @@ -18,7 +18,9 @@ */ package org.apache.pulsar.broker.protocol; -import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.ServerSocket; import static org.testng.Assert.assertEquals; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; @@ -46,7 +48,6 @@ import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerTestBase; -import org.apache.pulsar.common.util.PortManager; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -88,7 +89,14 @@ public void start(BrokerService service) { @Override public Map> newChannelInitializers() { - int port = nextLockedFreePort(); + // Pre-allocate a free port: protocol handlers need to register a listener at a known + // address before the broker calls back into them. + int port; + try (ServerSocket socket = new ServerSocket(0)) { + port = socket.getLocalPort(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } this.ports.add(port); return Collections.singletonMap(new InetSocketAddress(conf.getBindAddress(), port), new ChannelInitializer() { @@ -115,7 +123,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { @Override public void close() { - ports.removeIf(PortManager::releaseLockedPort); + ports.clear(); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java index b0e572a826c47..8f89bd74aabfb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java @@ -31,7 +31,6 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfo; -import org.apache.pulsar.common.util.PortManager; import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; import org.awaitility.Awaitility; import org.testng.Assert; @@ -42,14 +41,16 @@ public class ShadowTopicRealBkTest { private static final String cluster = "test"; - private final int zkPort = PortManager.nextLockedFreePort(); - private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, zkPort, PortManager::nextLockedFreePort); + // Both ZK and bookie ports use 0 so the kernel picks free ports at bind time. The actual + // ZK port is read back via bk.getZookeeperPort() after start. + private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, 0, () -> 0); private PulsarService pulsar; private PulsarAdmin admin; @BeforeClass public void setup() throws Exception { bk.start(); + final int zkPort = bk.getZookeeperPort(); final var config = new ServiceConfiguration(); config.setClusterName(cluster); config.setAdvertisedAddress("localhost"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java index ff344ebadea92..8e22fad2a76b4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java @@ -62,7 +62,6 @@ import org.apache.pulsar.broker.storage.ManagedLedgerStorage; import org.apache.pulsar.broker.storage.ManagedLedgerStorageClass; import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown; -import org.apache.pulsar.common.util.PortManager; import org.apache.pulsar.compaction.CompactionServiceFactory; import org.apache.pulsar.compaction.Compactor; import org.apache.pulsar.compaction.PulsarCompactionServiceFactory; @@ -723,32 +722,39 @@ private TestZKServer createTestZookeeper(int sessionTimeout) throws Exception { protected void handlePreallocatePorts(ServiceConfiguration config) { if (super.preallocatePorts) { + // Pre-allocate ports for callers that need the port number BEFORE the broker + // starts (e.g. to build advertised-listener URLs). We open a ServerSocket on + // port 0, take the kernel-assigned port, and close the socket. There is a brief + // window where another process could grab the same port; tests that don't need + // a pre-known port should leave preallocatePorts=false and bind to 0 directly. config.getBrokerServicePort().ifPresent(portNumber -> { if (portNumber == 0) { - config.setBrokerServicePort(Optional.of(PortManager.nextLockedFreePort())); + config.setBrokerServicePort(Optional.of(allocateFreePort())); } }); config.getBrokerServicePortTls().ifPresent(portNumber -> { if (portNumber == 0) { - config.setBrokerServicePortTls(Optional.of(PortManager.nextLockedFreePort())); + config.setBrokerServicePortTls(Optional.of(allocateFreePort())); } }); config.getWebServicePort().ifPresent(portNumber -> { if (portNumber == 0) { - config.setWebServicePort(Optional.of(PortManager.nextLockedFreePort())); + config.setWebServicePort(Optional.of(allocateFreePort())); } }); config.getWebServicePortTls().ifPresent(portNumber -> { if (portNumber == 0) { - config.setWebServicePortTls(Optional.of(PortManager.nextLockedFreePort())); + config.setWebServicePortTls(Optional.of(allocateFreePort())); } }); - registerCloseable(() -> { - config.getBrokerServicePort().ifPresent(PortManager::releaseLockedPort); - config.getBrokerServicePortTls().ifPresent(PortManager::releaseLockedPort); - config.getWebServicePort().ifPresent(PortManager::releaseLockedPort); - config.getWebServicePortTls().ifPresent(PortManager::releaseLockedPort); - }); + } + } + + private static int allocateFreePort() { + try (java.net.ServerSocket socket = new java.net.ServerSocket(0)) { + return socket.getLocalPort(); + } catch (java.io.IOException e) { + throw new java.io.UncheckedIOException(e); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ServiceUrlQuarantineTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ServiceUrlQuarantineTest.java index a1d31c0b9b5c4..fae597d699dd7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ServiceUrlQuarantineTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ServiceUrlQuarantineTest.java @@ -37,8 +37,10 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionMode; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.ServerSocket; import org.apache.pulsar.common.net.ServiceURI; -import org.apache.pulsar.common.util.PortManager; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -54,23 +56,24 @@ public class ServiceUrlQuarantineTest extends ProducerConsumerBase { private PulsarClientImpl pulsarClientWithHttpServiceUrlDisableQuarantine; private int brokerServicePort; private int webServicePort; - private final Set lockedFreePortSet = new HashSet<>(); private static final int UNAVAILABLE_NODES = 20; private static final int TIMEOUT_MS = 500; @BeforeClass(alwaysRun = true) @Override protected void setup() throws Exception { - this.brokerServicePort = nextLockedFreePort(); - this.webServicePort = nextLockedFreePort(); + // Pre-allocate ports for the broker because they must match the URL the test builds below. + this.brokerServicePort = allocateFreePort(); + this.webServicePort = allocateFreePort(); super.internalSetup(); super.producerBaseSetup(); - // Create a Pulsar client with some unavailable nodes + // Create a Pulsar client with some unavailable nodes (port allocated, socket closed — + // nothing is bound there during the test). StringBuilder binaryServiceUrlBuilder = new StringBuilder(pulsar.getBrokerServiceUrl()); StringBuilder httpServiceUrlBuilder = new StringBuilder(pulsar.getWebServiceAddress()); for (int i = 0; i < UNAVAILABLE_NODES; i++) { - binaryServiceUrlBuilder.append(",127.0.0.1:").append(nextLockedFreePort()); - httpServiceUrlBuilder.append(",127.0.0.1:").append(nextLockedFreePort()); + binaryServiceUrlBuilder.append(",127.0.0.1:").append(allocateFreePort()); + httpServiceUrlBuilder.append(",127.0.0.1:").append(allocateFreePort()); } this.binaryServiceUrlWithUnavailableNodes = binaryServiceUrlBuilder.toString(); this.httpServiceUrlWithUnavailableNodes = httpServiceUrlBuilder.toString(); @@ -97,10 +100,12 @@ protected void setup() throws Exception { .build(); } - private int nextLockedFreePort() { - int newLockedFreePort = PortManager.nextLockedFreePort(); - this.lockedFreePortSet.add(newLockedFreePort); - return newLockedFreePort; + private static int allocateFreePort() { + try (ServerSocket socket = new ServerSocket(0)) { + return socket.getLocalPort(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } } @Override @@ -133,9 +138,6 @@ protected void cleanup() throws Exception { if (pulsarClientWithHttpServiceUrlDisableQuarantine != null) { pulsarClientWithHttpServiceUrlDisableQuarantine.close(); } - for (Integer port : lockedFreePortSet) { - PortManager.releaseLockedPort(port); - } } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java index 9f27350f73dbb..6963e880b7973 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.functions.worker; -import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; @@ -49,7 +48,6 @@ import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.util.ClassLoaderUtils; import org.apache.pulsar.common.util.ObjectMapperFactory; -import org.apache.pulsar.common.util.PortManager; import org.apache.pulsar.functions.api.utils.IdentityFunction; import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory; import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig; @@ -102,16 +100,13 @@ void setup() throws Exception { // start brokers for (int i = 0; i < BROKER_COUNT; i++) { - int brokerPort = nextLockedFreePort(); - int webPort = nextLockedFreePort(); - ServiceConfiguration config = new ServiceConfiguration(); config.setBrokerShutdownTimeoutMs(0L); config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setWebServicePort(Optional.empty()); - config.setWebServicePortTls(Optional.of(webPort)); + config.setWebServicePortTls(Optional.of(0)); config.setBrokerServicePort(Optional.empty()); - config.setBrokerServicePortTls(Optional.of(brokerPort)); + config.setBrokerServicePortTls(Optional.of(0)); config.setClusterName("my-cluster"); config.setAdvertisedAddress("localhost"); config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort()); @@ -221,10 +216,6 @@ void tearDown() throws Exception { for (int i = 0; i < BROKER_COUNT; i++) { if (pulsarServices[i] != null) { pulsarServices[i].close(); - pulsarServices[i].getConfiguration(). - getBrokerServicePort().ifPresent(PortManager::releaseLockedPort); - pulsarServices[i].getConfiguration() - .getWebServicePort().ifPresent(PortManager::releaseLockedPort); pulsarServices[i] = null; } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsembleTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsembleTest.java index 3bc42cf03682d..276a911313d73 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsembleTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsembleTest.java @@ -60,8 +60,7 @@ public void testStartStop() throws Exception { public void testStartWithSpecifiedStreamStoragePort() throws Exception { LocalBookkeeperEnsemble ensemble = null; try { - ensemble = - new LocalBookkeeperEnsemble(1, 0, 0, 4182, null, null, true, null); + ensemble = new LocalBookkeeperEnsemble(1, 0, 4182, null, null, true, null, () -> 0); ensemble.startStandalone(new ServerConfiguration(), true); } finally { if (ensemble != null) { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/PortManager.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/PortManager.java deleted file mode 100644 index 612deb9e01727..0000000000000 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/PortManager.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.common.util; - -import java.net.ServerSocket; -import java.util.HashSet; -import java.util.Set; - -public class PortManager { - - private static final Set PORTS = new HashSet<>(); - - /** - * Return a locked available port. - * - * @return locked available port. - */ - public static synchronized int nextLockedFreePort() { - int exceptionCount = 0; - while (true) { - try (ServerSocket ss = new ServerSocket(0)) { - int port = ss.getLocalPort(); - if (!checkPortIfLocked(port)) { - PORTS.add(port); - return port; - } - } catch (Exception e) { - exceptionCount++; - if (exceptionCount > 100) { - throw new RuntimeException("Unable to allocate socket port", e); - } - } - } - } - - /** - * Returns whether the port was released successfully. - * - * @return whether the release is successful. - */ - public static synchronized boolean releaseLockedPort(int lockedPort) { - return PORTS.remove(lockedPort); - } - - /** - * Check port if locked. - * - * @return whether the port is locked. - */ - public static synchronized boolean checkPortIfLocked(int lockedPort) { - return PORTS.contains(lockedPort); - } -} diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/PortManagerTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/PortManagerTest.java deleted file mode 100644 index 408753300cc0d..0000000000000 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/PortManagerTest.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.common.util; - -import static org.apache.pulsar.common.util.PortManager.checkPortIfLocked; -import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort; -import static org.apache.pulsar.common.util.PortManager.releaseLockedPort; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertTrue; -import org.testng.annotations.Test; - -public class PortManagerTest { - @Test - public void testCheckPortIfLockedAndRemove() { - int port = nextLockedFreePort(); - assertTrue(checkPortIfLocked(port)); - assertTrue(releaseLockedPort(port)); - assertFalse(checkPortIfLocked(port)); - } -} diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java index 9ec10326c26c7..aafe4889cccbe 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java @@ -31,12 +31,9 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import lombok.CustomLog; import lombok.Getter; import org.apache.bookkeeper.bookie.BookieImpl; -import org.apache.bookkeeper.bookie.Cookie; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.common.allocator.PoolingPolicy; import org.apache.bookkeeper.common.component.Lifecycle; @@ -50,7 +47,6 @@ import org.apache.bookkeeper.server.conf.BookieConfiguration; import org.apache.bookkeeper.util.IOUtils; import org.apache.commons.io.FileUtils; -import org.apache.pulsar.common.util.PortManager; import org.apache.pulsar.metadata.api.MetadataStoreConfig; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; @@ -74,7 +70,6 @@ public class BKCluster implements AutoCloseable { protected final ServerConfiguration baseConf; protected final ClientConfiguration baseClientConf; - private final List lockedPorts = new ArrayList<>(); private final AtomicBoolean closed = new AtomicBoolean(false); public static class BKClusterConf { @@ -83,7 +78,6 @@ public static class BKClusterConf { private String metadataServiceUri; private int numBookies = 1; private String dataDir; - private int bkPort = 0; private boolean clearOldData; @@ -107,11 +101,6 @@ public BKClusterConf dataDir(String dataDir) { return this; } - public BKClusterConf bkPort(int bkPort) { - this.bkPort = bkPort; - return this; - } - public BKClusterConf clearOldData(boolean clearOldData) { this.clearOldData = clearOldData; return this; @@ -163,8 +152,6 @@ public void close() throws Exception { } catch (Exception e) { log.error().exception(e).log("Got Exception while trying to stop BKCluster"); } - lockedPorts.forEach(PortManager::releaseLockedPort); - lockedPorts.clear(); // cleanup temp dirs try { cleanupTempDirs(); @@ -240,40 +227,23 @@ private ServerConfiguration newServerConfiguration(int index) throws Exception { cleanDirectory(dataDir); } - int port; - if (baseConf.isEnableLocalTransport() || !baseConf.getAllowEphemeralPorts() || clusterConf.bkPort == 0) { - port = PortManager.nextLockedFreePort(); - lockedPorts.add(port); - } else { - // bk 4.15 cookie validation finds the same ip:port in case of port 0 - // and 2nd bookie's cookie validation fails - port = clusterConf.bkPort; - } - File[] cookieDir = dataDir.listFiles((file) -> file.getName().equals("current")); - if (cookieDir != null && cookieDir.length > 0) { - String existBookieAddr = parseBookieAddressFromCookie(cookieDir[0]); - if (existBookieAddr != null) { - baseConf.setAdvertisedAddress(existBookieAddr.split(":")[0]); - port = Integer.parseInt(existBookieAddr.split(":")[1]); - } - } - return newServerConfiguration(port, dataDir, new File[]{dataDir}); - } - - private String parseBookieAddressFromCookie(File dir) throws IOException { - Cookie cookie = Cookie.readFromDirectory(dir); - Pattern pattern = Pattern.compile(".*bookieHost: \"(.*?)\".*", Pattern.DOTALL); - Matcher m = pattern.matcher(cookie.toString()); - return m.find() ? m.group(1) : null; + // Bookies bind to a kernel-assigned port. Identity is established via a `bookieId` + // derived from the data dir path: bookies with the same dir (e.g. on cluster restart) + // get the same id so cookies match, while bookies with different dirs (e.g. separate + // test runs sharing the same metadata store) get different ids and don't collide. + String bookieId = "bk-" + index + "-" + Integer.toHexString(dataDir.getAbsolutePath().hashCode()); + return newServerConfiguration(0, bookieId, dataDir, new File[]{dataDir}); } private ClientConfiguration newClientConfiguration() { return new ClientConfiguration(baseConf); } - private ServerConfiguration newServerConfiguration(int port, File journalDir, File[] ledgerDirs) { + private ServerConfiguration newServerConfiguration(int port, String bookieId, File journalDir, + File[] ledgerDirs) { ServerConfiguration conf = new ServerConfiguration(baseConf); conf.setBookiePort(port); + conf.setBookieId(bookieId); conf.setJournalDirName(journalDir.getPath()); String[] ledgerDirNames = new String[ledgerDirs.length]; for (int i = 0; i < ledgerDirs.length; i++) { @@ -373,7 +343,7 @@ private ServerConfiguration newBaseServerConfiguration() { confReturn.setAllowEphemeralPorts(true); confReturn.setJournalWriteData(false); confReturn.setProperty("journalPreAllocSizeMB", 1); - confReturn.setBookiePort(clusterConf.bkPort); + confReturn.setBookiePort(0); confReturn.setGcWaitTime(1000L); confReturn.setDiskUsageThreshold(0.999F); confReturn.setDiskUsageWarnThreshold(0.99F); diff --git a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookKeeperClusterTestCase.java b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookKeeperClusterTestCase.java index 6159741f53577..b3f1786bf810b 100644 --- a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookKeeperClusterTestCase.java +++ b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookKeeperClusterTestCase.java @@ -25,7 +25,6 @@ package org.apache.bookkeeper.replication; import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE; -import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort; import static org.testng.Assert.assertFalse; import com.google.common.base.Stopwatch; import com.google.common.util.concurrent.MoreExecutors; @@ -65,7 +64,6 @@ import org.apache.bookkeeper.test.TmpDirs; import org.apache.bookkeeper.test.ZooKeeperCluster; import org.apache.bookkeeper.test.ZooKeeperClusterUtil; -import org.apache.pulsar.common.util.PortManager; import org.apache.pulsar.metadata.api.MetadataStoreConfig; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore; @@ -311,19 +309,12 @@ protected void stopBKCluster() throws Exception { t.shutdown(); } servers.clear(); - bookiePorts.removeIf(PortManager::releaseLockedPort); + bookiePorts.clear(); } protected ServerConfiguration newServerConfiguration() throws Exception { File f = tmpDirs.createNew("bookie", "test"); - - int port; - if (baseConf.isEnableLocalTransport() || !baseConf.getAllowEphemeralPorts()) { - port = nextLockedFreePort(); - } else { - port = 0; - } - return newServerConfiguration(port, f, new File[] { f }); + return newServerConfiguration(0, f, new File[] { f }); } protected ClientConfiguration newClientConfiguration() { diff --git a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java index 3abd3580e860b..b1d2e47cc4580 100644 --- a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java +++ b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java @@ -28,7 +28,6 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_INDEX_SCOPE; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_LEDGER_SCOPE; import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE; -import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort; import static org.testng.Assert.assertFalse; import com.google.common.base.Stopwatch; import java.io.File; @@ -85,7 +84,6 @@ import org.apache.bookkeeper.test.ZooKeeperCluster; import org.apache.bookkeeper.test.ZooKeeperClusterUtil; import org.apache.bookkeeper.util.DiskChecker; -import org.apache.pulsar.common.util.PortManager; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooKeeper; import org.testng.annotations.AfterMethod; @@ -286,19 +284,12 @@ protected void stopBKCluster() throws Exception { t.shutdown(); } servers.clear(); - bookiePorts.removeIf(PortManager::releaseLockedPort); + bookiePorts.clear(); } protected ServerConfiguration newServerConfiguration() throws Exception { File f = tmpDirs.createNew("bookie", "test"); - - int port; - if (baseConf.isEnableLocalTransport() || !baseConf.getAllowEphemeralPorts()) { - port = nextLockedFreePort(); - } else { - port = 0; - } - return newServerConfiguration(port, f, new File[] { f }); + return newServerConfiguration(0, f, new File[] { f }); } protected ClientConfiguration newClientConfiguration() { diff --git a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/PortManager.java b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/PortManager.java deleted file mode 100644 index c5ab8dd629ce6..0000000000000 --- a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/PortManager.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.packages.management.storage.bookkeeper.bookkeeper.test; - -import java.io.IOException; -import java.net.ConnectException; -import java.net.Inet4Address; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.nio.channels.FileLock; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.nio.file.StandardOpenOption; -import lombok.Cleanup; - -/** - * Port manager allows a base port to be specified on the commandline. Tests will then use ports, counting up from this - * base port. This allows multiple instances of the bookkeeper tests to run at once. - */ -public class PortManager { - - private static final String lockFilename = System.getProperty("test.lockFilename", - "/tmp/pulsar-test-port-manager.lock"); - private static final int BASE_PORT = Integer.parseInt(System.getProperty("test.basePort", "15000")); - - private static final int MAX_PORT = 32000; - - /** - * Return a TCP port that is currently unused. - * - * Keeps track of assigned ports and avoid race condition between different processes - */ - public static synchronized int nextFreePort() { - Path path = Paths.get(lockFilename); - - try { - @Cleanup - FileChannel fileChannel = FileChannel.open(path, - StandardOpenOption.CREATE, - StandardOpenOption.WRITE, - StandardOpenOption.READ); - - @Cleanup - FileLock lock = fileChannel.lock(); - - ByteBuffer buffer = ByteBuffer.allocate(32); - int len = fileChannel.read(buffer, 0L); - buffer.flip(); - - int lastUsedPort = BASE_PORT; - if (len > 0) { - byte[] bytes = new byte[buffer.remaining()]; - buffer.get(bytes); - String lastUsedPortStr = new String(bytes); - lastUsedPort = Integer.parseInt(lastUsedPortStr); - } - - int freePort = probeFreePort(lastUsedPort + 1); - - buffer.clear(); - buffer.put(Integer.toString(freePort).getBytes()); - buffer.flip(); - fileChannel.write(buffer, 0L); - fileChannel.truncate(buffer.position()); - fileChannel.force(true); - - return freePort; - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - private static final int MAX_PORT_CONFLICTS = 10; - - private static synchronized int probeFreePort(int port) { - int exceptionCount = 0; - while (true) { - if (port == MAX_PORT) { - // Rollover the port probe - port = BASE_PORT; - } - - try (Socket s = new Socket()) { - s.connect(new InetSocketAddress(Inet4Address.getLoopbackAddress(), port), 100); - - // If we succeed to connect it means the port is being used - - } catch (ConnectException e) { - return port; - } catch (Exception e) { - e.printStackTrace(); - } - - port++; - exceptionCount++; - if (exceptionCount > MAX_PORT_CONFLICTS) { - throw new RuntimeException("Failed to find an open port"); - } - } - } - - public static void main(String[] args) throws Exception { - while (true) { - System.out.println("Port: " + nextFreePort()); - Thread.sleep(100); - } - } -} diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java index d6eaf5dcc66e6..8d660b93830df 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java @@ -18,7 +18,9 @@ */ package org.apache.pulsar.proxy.extensions; -import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.ServerSocket; import static org.mockito.Mockito.doReturn; import static org.testng.Assert.assertEquals; import io.netty.buffer.ByteBuf; @@ -50,7 +52,6 @@ import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; -import org.apache.pulsar.common.util.PortManager; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.apache.pulsar.proxy.server.ProxyConfiguration; import org.apache.pulsar.proxy.server.ProxyService; @@ -90,7 +91,14 @@ public void start(ProxyService service) { @Override public Map> newChannelInitializers() { - int port = nextLockedFreePort(); + // Pre-allocate a free port: extension handlers need to register a listener at a known + // address before the proxy calls back into them. + int port; + try (ServerSocket socket = new ServerSocket(0)) { + port = socket.getLocalPort(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } this.ports.add(port); return Collections.singletonMap(new InetSocketAddress(conf.getBindAddress(), port), new ChannelInitializer() { @@ -117,7 +125,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { @Override public void close() { - ports.removeIf(PortManager::releaseLockedPort); + ports.clear(); } } From 43ac127fe3c145639f8c752cf3a81fc1877d3367 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Tue, 5 May 2026 18:43:47 -0700 Subject: [PATCH 2/7] =?UTF-8?q?Drop=20the=20bookie=20port=20Supplier=20par?= =?UTF-8?q?ameter=20=E2=80=94=20every=20caller=20is=20()=20->=200?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Now that all callers pass () -> 0, the supplier is dead weight. Remove it from LocalBookkeeperEnsemble entirely; bookies always bind to port 0. Updated all 41 call sites accordingly. --- .../org/apache/pulsar/PulsarStandalone.java | 4 ++-- .../zookeeper/LocalBookkeeperEnsemble.java | 21 +++++++------------ .../pulsar/broker/SLAMonitoringTest.java | 2 +- .../admin/GetPartitionMetadataTest.java | 2 +- .../LeaderElectionServiceTest.java | 2 +- .../broker/loadbalance/LoadBalancerTest.java | 2 +- .../loadbalance/SimpleBrokerStartTest.java | 4 ++-- .../SimpleLoadManagerImplTest.java | 2 +- .../BrokerRegistryIntegrationTest.java | 2 +- .../extensions/BrokerRegistryTest.java | 2 +- .../ExtensibleLoadManagerCloseTest.java | 2 +- .../extensions/LoadManagerFailFastTest.java | 2 +- .../impl/BundleSplitterTaskTest.java | 2 +- .../impl/ModularLoadManagerImplTest.java | 4 ++-- .../PulsarClientBasedHandlerTest.java | 2 +- .../broker/service/AdvertisedAddressTest.java | 2 +- .../service/BacklogQuotaManagerTest.java | 2 +- .../broker/service/BkEnsemblesTestBase.java | 2 +- .../service/BrokerBookieIsolationTest.java | 2 +- .../service/BrokerEventLoopShutdownTest.java | 2 +- ...econnectZKClientPulsarServiceBaseTest.java | 2 +- ...licationWithConfigurationSyncTestBase.java | 4 ++-- .../broker/service/MaxMessageSizeTest.java | 2 +- .../broker/service/NetworkErrorTestBase.java | 2 +- .../service/OneWayReplicatorTestBase.java | 4 ++-- .../broker/service/ReplicatorTestBase.java | 8 +++---- .../pulsar/broker/service/TopicOwnerTest.java | 2 +- .../persistent/ShadowTopicRealBkTest.java | 6 +++--- .../TransactionMetaStoreTestBase.java | 2 +- .../api/ClientDeduplicationFailureTest.java | 2 +- .../api/HybridTypesAcknowledgeTest.java | 2 +- .../client/api/NonPersistentTopicTest.java | 6 +++--- .../worker/PulsarFunctionE2ESecurityTest.java | 2 +- .../worker/PulsarFunctionLocalRunTest.java | 2 +- .../worker/PulsarFunctionPublishTest.java | 2 +- .../worker/PulsarFunctionTlsTest.java | 2 +- .../worker/PulsarWorkerAssignmentTest.java | 2 +- .../pulsar/io/AbstractPulsarE2ETest.java | 2 +- .../pulsar/io/PulsarFunctionAdminTest.java | 2 +- .../pulsar/io/PulsarFunctionTlsTest.java | 2 +- .../LocalBookkeeperEnsembleTest.java | 4 ++-- 41 files changed, 60 insertions(+), 67 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java index d6f6a4399f418..b673126307934 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java @@ -472,10 +472,10 @@ private void startBookieWithZookeeper() throws Exception { ServerConfiguration bkServerConf = new ServerConfiguration(); bkServerConf.loadConf(new File(configFile).toURI().toURL()); calculateCacheSize(bkServerConf); - // Start LocalBookKeeper. Bookies bind to kernel-assigned ports (() -> 0). + // Start LocalBookKeeper. Bookies bind to kernel-assigned ports. bkEnsemble = new LocalBookkeeperEnsemble( this.getNumOfBk(), this.getZkPort(), this.getStreamStoragePort(), this.getZkDir(), - this.getBkDir(), this.isWipeData(), "127.0.0.1", () -> 0); + this.getBkDir(), this.isWipeData(), "127.0.0.1"); bkEnsemble.startStandalone(bkServerConf, !this.isNoStreamStorage()); config.setMetadataStoreUrl("zk:127.0.0.1:" + zkPort); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java b/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java index 262b4a7b9b9db..336cdf85aef2b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java @@ -42,7 +42,6 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; import java.util.stream.Stream; import java.util.stream.StreamSupport; import lombok.CustomLog; @@ -91,23 +90,18 @@ public class LocalBookkeeperEnsemble { int numberOfBookies; private final boolean clearOldData; - // Supplier of bookie ports. Tests typically use `() -> 0` so the kernel picks a free port at - // bind time. Bookies are identified by `bookieId`, not host:port, so multiple ephemeral - // bookies coexist fine. - private final Supplier portManager; - - public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, Supplier portManager) { - this(numberOfBookies, zkPort, 4181, null, null, true, null, portManager); + public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort) { + this(numberOfBookies, zkPort, 4181, null, null, true, null); } public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, String zkDataDirName, String bkDataDirName, boolean clearOldData) { - this(numberOfBookies, zkPort, 4181, zkDataDirName, bkDataDirName, clearOldData, null, () -> 0); + this(numberOfBookies, zkPort, 4181, zkDataDirName, bkDataDirName, clearOldData, null); } public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, String zkDataDirName, String bkDataDirName, boolean clearOldData, String advertisedAddress) { - this(numberOfBookies, zkPort, 4181, zkDataDirName, bkDataDirName, clearOldData, advertisedAddress, () -> 0); + this(numberOfBookies, zkPort, 4181, zkDataDirName, bkDataDirName, clearOldData, advertisedAddress); } public LocalBookkeeperEnsemble(int numberOfBookies, @@ -116,10 +110,8 @@ public LocalBookkeeperEnsemble(int numberOfBookies, String zkDataDirName, String bkDataDirName, boolean clearOldData, - String advertisedAddress, - Supplier portManager) { + String advertisedAddress) { this.numberOfBookies = numberOfBookies; - this.portManager = portManager; this.streamStoragePort = streamStoragePort; this.zkDataDirName = zkDataDirName; this.bkDataDirName = bkDataDirName; @@ -278,7 +270,8 @@ private void runBookies(ServerConfiguration baseConf) throws Exception { cleanDirectory(bkDataDir); } - int bookiePort = portManager.get(); + // Bookies bind to a kernel-assigned port; identity is established via bookieId. + int bookiePort = 0; String bookieId = "bk" + i + "test"; // Ensure registration Z-nodes are cleared when standalone service is restarted ungracefully deleteBookieRegistrationZnode( diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java index 523006e9d74a6..3426753fbc66e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java @@ -71,7 +71,7 @@ void setup() throws Exception { new LinkedBlockingQueue<>()); log.info("---- Initializing SLAMonitoringTest -----"); // Start local bookkeeper ensemble - bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(3, 0); bkEnsemble.start(); // start brokers diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java index 49e4c6b21035e..b4dec43054774 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java @@ -78,7 +78,7 @@ public class GetPartitionMetadataTest extends TestRetrySupport { @BeforeClass(alwaysRun = true) protected void setup() throws Exception { incrementSetupNumber(); - bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(3, 0); bkEnsemble.start(); // Start broker. setupBrokers(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java index 62571cc3ff40b..0bdeb4341d99e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java @@ -52,7 +52,7 @@ public class LeaderElectionServiceTest { @BeforeMethod(alwaysRun = true) public void setup() throws Exception { - bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(3, 0); bkEnsemble.start(); log.info("---- bk started ----"); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java index c6617bbacaaee..cf8db26a4a6b5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java @@ -107,7 +107,7 @@ public class LoadBalancerTest { @BeforeMethod void setup() throws Exception { // Start local bookkeeper ensemble - bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(3, 0); bkEnsemble.start(); ZkUtils.createFullPathOptimistic(bkEnsemble.getZkClient(), SimpleLoadManagerImpl.LOADBALANCER_DYNAMIC_SETTING_STRATEGY_ZPATH, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java index 879afd00c1b78..36bd4b1e0fc16 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java @@ -48,7 +48,7 @@ public void testHasNICSpeed() throws Exception { } // Start local bookkeeper ensemble @Cleanup("stop") - LocalBookkeeperEnsemble bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + LocalBookkeeperEnsemble bkEnsemble = new LocalBookkeeperEnsemble(3, 0); bkEnsemble.start(); // Start broker ServiceConfiguration config = new ServiceConfiguration(); @@ -79,7 +79,7 @@ public void testNoNICSpeed() throws Exception { } // Start local bookkeeper ensemble @Cleanup("stop") - LocalBookkeeperEnsemble bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + LocalBookkeeperEnsemble bkEnsemble = new LocalBookkeeperEnsemble(3, 0); bkEnsemble.start(); // Start broker ServiceConfiguration config = new ServiceConfiguration(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java index 6103b948236e6..20a5883d65df7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java @@ -124,7 +124,7 @@ public class SimpleLoadManagerImplTest { void setup() throws Exception { // Start local bookkeeper ensemble - bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(3, 0); bkEnsemble.start(); // Start broker 1 diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java index 80e54278d31e4..138ad1dee1109 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java @@ -47,7 +47,7 @@ public class BrokerRegistryIntegrationTest { @BeforeClass protected void setup() throws Exception { - bk = new LocalBookkeeperEnsemble(2, 0, () -> 0); + bk = new LocalBookkeeperEnsemble(2, 0); bk.start(); pulsar = new PulsarService(brokerConfig()); pulsar.start(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java index 0247a68256e69..73b50b4603a9a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java @@ -169,7 +169,7 @@ void setup() throws Exception { executor = new ThreadPoolExecutor(5, 20, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); // Start local bookkeeper ensemble - bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(3, 0); bkEnsemble.start(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java index 50265c28edc7d..f0734fbe8bbe6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java @@ -47,7 +47,7 @@ public class ExtensibleLoadManagerCloseTest { @BeforeClass(alwaysRun = true) public void setup() throws Exception { - bk = new LocalBookkeeperEnsemble(1, 0, () -> 0); + bk = new LocalBookkeeperEnsemble(1, 0); bk.start(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerFailFastTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerFailFastTest.java index b57c4f72c8124..1dc254492b74f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerFailFastTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerFailFastTest.java @@ -37,7 +37,7 @@ public class LoadManagerFailFastTest { private static final String cluster = "test"; - private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, 0, () -> 0); + private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, 0); private final ServiceConfiguration config = new ServiceConfiguration(); @BeforeClass diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java index 7a012b78756d4..085bea1170e8c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java @@ -59,7 +59,7 @@ public class BundleSplitterTaskTest { @BeforeMethod void setup() throws Exception { // Start local bookkeeper ensemble - bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(3, 0); bkEnsemble.start(); // Start broker ServiceConfiguration config = new ServiceConfiguration(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java index fcfcabd97c588..c92de5eeb9cfe 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java @@ -39,6 +39,7 @@ import com.google.common.hash.Hashing; import java.lang.reflect.Field; import java.lang.reflect.Method; +import java.net.ServerSocket; import java.net.URL; import java.util.ArrayList; import java.util.EnumSet; @@ -88,7 +89,6 @@ import org.apache.pulsar.common.policies.data.ResourceQuota; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.util.ObjectMapperFactory; -import java.net.ServerSocket; import org.apache.pulsar.metadata.api.MetadataCache; import org.apache.pulsar.metadata.api.Notification; import org.apache.pulsar.metadata.api.NotificationType; @@ -173,7 +173,7 @@ void setup() throws Exception { executor = new ThreadPoolExecutor(1, 20, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); // Start local bookkeeper ensemble - bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(3, 0); bkEnsemble.start(); // Start broker 1 diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandlerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandlerTest.java index 3e8c28d8de7c8..0c6797c93fafc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandlerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandlerTest.java @@ -37,7 +37,7 @@ public class PulsarClientBasedHandlerTest { private static final String clusterName = "cluster"; private static final int shutdownTimeoutMs = 100; - private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, 0, () -> 0); + private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, 0); private File tempDirectory; private PulsarService pulsar; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java index 8dafe7a6af32d..d76c7fb19d2ab 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java @@ -41,7 +41,7 @@ public class AdvertisedAddressTest { @BeforeMethod public void setup() throws Exception { - bkEnsemble = new LocalBookkeeperEnsemble(1, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(1, 0); bkEnsemble.start(); ServiceConfiguration config = new ServiceConfiguration(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java index 6ac258d0ba83a..54c432fd870de 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java @@ -138,7 +138,7 @@ public Object[][] backlogQuotaSizeGB() { void setup() throws Exception { try { // start local bookie and zookeeper - bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(3, 0); bkEnsemble.start(); // start pulsar service diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java index 4a1fcbb04db30..2b01bbbb9d1b9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java @@ -86,7 +86,7 @@ protected void setup() throws Exception { incrementSetupNumber(); try { // start local bookie and zookeeper - bkEnsemble = new LocalBookkeeperEnsemble(numberOfBookies, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(numberOfBookies, 0); bkEnsemble.start(); // start pulsar service diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java index 2403675253fa4..24be9695184d8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java @@ -97,7 +97,7 @@ public class BrokerBookieIsolationTest { @BeforeMethod protected void setup() throws Exception { // Start local bookkeeper ensemble - bkEnsemble = new LocalBookkeeperEnsemble(4, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(4, 0); bkEnsemble.start(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEventLoopShutdownTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEventLoopShutdownTest.java index 29421f155b6db..b266710219822 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEventLoopShutdownTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEventLoopShutdownTest.java @@ -37,7 +37,7 @@ public class BrokerEventLoopShutdownTest { @BeforeClass(alwaysRun = true) public void setup() throws Exception { - bk = new LocalBookkeeperEnsemble(2, 0, () -> 0); + bk = new LocalBookkeeperEnsemble(2, 0); bk.start(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java index c0bd01c64a2bb..f9f0030c45fe6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java @@ -80,7 +80,7 @@ protected void startZKAndBK() throws Exception { brokerConfigZk.start(); // Start BK. - bkEnsemble = new LocalBookkeeperEnsemble(numberOfBookies, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(numberOfBookies, 0); bkEnsemble.start(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GeoReplicationWithConfigurationSyncTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GeoReplicationWithConfigurationSyncTestBase.java index 3b4bac53fec42..728af78999204 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GeoReplicationWithConfigurationSyncTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GeoReplicationWithConfigurationSyncTestBase.java @@ -77,9 +77,9 @@ protected void startZKAndBK() throws Exception { brokerConfigZk2.start(); // Start BK. - bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0); bkEnsemble1.start(); - bkEnsemble2 = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble2 = new LocalBookkeeperEnsemble(3, 0); bkEnsemble2.start(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java index 61e454622907b..b4a7e4ef0ffd5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java @@ -54,7 +54,7 @@ public class MaxMessageSizeTest { @BeforeMethod void setup() { try { - bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(3, 0); ServerConfiguration conf = new ServerConfiguration(); conf.setNettyMaxFrameSizeBytes(10 * 1024 * 1024 + 10 * 1024); bkEnsemble.startStandalone(conf, false); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java index 0d5ae2a4c0d26..de30a4c9d82c7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java @@ -86,7 +86,7 @@ public abstract class NetworkErrorTestBase extends TestRetrySupport { protected void startZKAndBK() throws Exception { // Start ZK & BK. - bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0); bkEnsemble1.start(); metadataZKProxy = new Ipv4Proxy(getOneFreePort(), "127.0.0.1", bkEnsemble1.getZookeeperPort()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java index a73a2844f79dd..0aec5358be6f8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java @@ -114,9 +114,9 @@ protected void startZKAndBK() throws Exception { } // Start BK. - bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0); bkEnsemble1.start(); - bkEnsemble2 = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble2 = new LocalBookkeeperEnsemble(3, 0); bkEnsemble2.start(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java index 265d685223e75..755230a8537b3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java @@ -155,7 +155,7 @@ protected void setup() throws Exception { globalZkS.start(); // Start region 1 - bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0); bkEnsemble1.start(); // NOTE: we have to instantiate a new copy of System.getProperties() to make sure pulsar1 and pulsar2 have @@ -174,7 +174,7 @@ protected void setup() throws Exception { // Start region 2 // Start zk & bks - bkEnsemble2 = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble2 = new LocalBookkeeperEnsemble(3, 0); bkEnsemble2.start(); setConfig2DefaultValue(); @@ -190,7 +190,7 @@ protected void setup() throws Exception { // Start region 3 // Start zk & bks - bkEnsemble3 = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble3 = new LocalBookkeeperEnsemble(3, 0); bkEnsemble3.start(); setConfig3DefaultValue(); @@ -206,7 +206,7 @@ protected void setup() throws Exception { // Start region 4 // Start zk & bks - bkEnsemble4 = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble4 = new LocalBookkeeperEnsemble(3, 0); bkEnsemble4.start(); setConfig4DefaultValue(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java index 809f9a5a6e0ed..cccc3ceeb5260 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java @@ -71,7 +71,7 @@ public class TopicOwnerTest { void setup() throws Exception { log.info("---- Initializing TopicOwnerTest -----"); // Start local bookkeeper ensemble - bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(3, 0); bkEnsemble.start(); // start brokers diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java index 8f89bd74aabfb..505a5603157e4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java @@ -41,9 +41,9 @@ public class ShadowTopicRealBkTest { private static final String cluster = "test"; - // Both ZK and bookie ports use 0 so the kernel picks free ports at bind time. The actual - // ZK port is read back via bk.getZookeeperPort() after start. - private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, 0, () -> 0); + // ZK port 0 lets the kernel pick a free port at bind time. The actual port is read back + // via bk.getZookeeperPort() after start. Bookies always bind to kernel-assigned ports. + private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, 0); private PulsarService pulsar; private PulsarAdmin admin; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java index 81321427a2b4b..904a4807ad391 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java @@ -50,7 +50,7 @@ public abstract class TransactionMetaStoreTestBase extends TestRetrySupport { protected final void setup() throws Exception { log.info().attr("class", getClass().getSimpleName()).log("---- Initializing -----"); // Start local bookkeeper ensemble - bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(3, 0); bkEnsemble.start(); String[] args = new String[]{ diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java index 2602233293e78..6f173aa2ba04e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java @@ -76,7 +76,7 @@ void setup(Method method) throws Exception { log.info().attr("upMethod", method.getName()).log("--- Setting up method"); // Start local bookkeeper ensemble - bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(3, 0); bkEnsemble.start(); config = new ServiceConfiguration(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/HybridTypesAcknowledgeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/HybridTypesAcknowledgeTest.java index 3fa6750c7de33..451a15ebe50ed 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/HybridTypesAcknowledgeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/HybridTypesAcknowledgeTest.java @@ -73,7 +73,7 @@ public class HybridTypesAcknowledgeTest extends TestRetrySupport { @BeforeClass(alwaysRun = true) protected void setup() throws Exception { incrementSetupNumber(); - bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(3, 0); bkEnsemble.start(); // Start broker. setupBrokers(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java index 97126457cd1ee..1552940cda407 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java @@ -989,7 +989,7 @@ void setupReplicationCluster() throws Exception { globalZkS.start(); // Start region 1 - bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0); bkEnsemble1.start(); // NOTE: we have to instantiate a new copy of System.getProperties() to make sure pulsar1 and pulsar2 have @@ -1019,7 +1019,7 @@ void setupReplicationCluster() throws Exception { // Start region 2 // Start zk & bks - bkEnsemble2 = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble2 = new LocalBookkeeperEnsemble(3, 0); bkEnsemble2.start(); config2 = new ServiceConfiguration(); @@ -1046,7 +1046,7 @@ void setupReplicationCluster() throws Exception { // Start region 3 // Start zk & bks - bkEnsemble3 = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble3 = new LocalBookkeeperEnsemble(3, 0); bkEnsemble3.start(); config3 = new ServiceConfiguration(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java index 3fbd4564a922c..a39b1ef1b87a3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java @@ -119,7 +119,7 @@ void setup(Method method) throws Exception { log.info().attr("method", method.getName()).log("Setting up method"); // Start local bookkeeper ensemble - bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(3, 0); bkEnsemble.start(); config = new ServiceConfiguration(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java index 122a81d3779a0..b6952682fa114 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java @@ -202,7 +202,7 @@ void setup(Method method) throws Exception { log.info().attr("method", method.getName()).log("Setting up method"); // Start local bookkeeper ensemble - bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(3, 0); bkEnsemble.start(); config = new ServiceConfiguration(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java index 1bbabca36139d..5450c0c17b3ca 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java @@ -124,7 +124,7 @@ void setup(Method method) throws Exception { log.info().attr("method", method.getName()).log("Setting up method"); // Start local bookkeeper ensemble - bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(3, 0); bkEnsemble.start(); config = new ServiceConfiguration(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java index 6963e880b7973..734a3e473c093 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java @@ -95,7 +95,7 @@ public class PulsarFunctionTlsTest { void setup() throws Exception { log.info("---- Initializing TopicOwnerTest -----"); // Start local bookkeeper ensemble - bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(3, 0); bkEnsemble.start(); // start brokers diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java index 7766b8a5054b4..ae91b671643f0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java @@ -83,7 +83,7 @@ void setup(Method method) throws Exception { log.info().attr("method", method.getName()).log("Setting up method"); // Start local bookkeeper ensemble - bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(3, 0); bkEnsemble.start(); config = new ServiceConfiguration(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java index cc010b82d9b90..53705e13be689 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java @@ -114,7 +114,7 @@ public void setup(Method method) throws Exception { log.info().attr("method", method.getName()).log("--- Setting up method ---"); // Start local bookkeeper ensemble - bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(3, 0); bkEnsemble.start(); config = new ServiceConfiguration(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java index 9e4b12f0a46b2..2bda831a999df 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java @@ -93,7 +93,7 @@ void setup(Method method) throws Exception { log.info().attr("method", method.getName()).log("Setting up method"); // Start local bookkeeper ensemble - bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(3, 0); bkEnsemble.start(); config = new ServiceConfiguration(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java index aa78d3aaa916b..f121566d03787 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java @@ -110,7 +110,7 @@ void setup(Method method) throws Exception { log.info().attr("method", method.getName()).log("Setting up method"); // Start local bookkeeper ensemble - bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble = new LocalBookkeeperEnsemble(3, 0); bkEnsemble.start(); config = new ServiceConfiguration(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsembleTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsembleTest.java index 276a911313d73..65ad1e490c0ff 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsembleTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsembleTest.java @@ -42,7 +42,7 @@ public void testStartStop() throws Exception { final int numBk = 1; // Start local Bookies/ZooKeepers and confirm that they are running at specified ports - LocalBookkeeperEnsemble ensemble = new LocalBookkeeperEnsemble(numBk, 0, () -> 0); + LocalBookkeeperEnsemble ensemble = new LocalBookkeeperEnsemble(numBk, 0); ensemble.start(); assertTrue(ensemble.getZkServer().isRunning()); assertEquals(ensemble.getZkServer().getClientPort(), ensemble.getZookeeperPort()); @@ -60,7 +60,7 @@ public void testStartStop() throws Exception { public void testStartWithSpecifiedStreamStoragePort() throws Exception { LocalBookkeeperEnsemble ensemble = null; try { - ensemble = new LocalBookkeeperEnsemble(1, 0, 4182, null, null, true, null, () -> 0); + ensemble = new LocalBookkeeperEnsemble(1, 0, 4182, null, null, true, null); ensemble.startStandalone(new ServerConfiguration(), true); } finally { if (ensemble != null) { From 22a63f90cd9716fca5748992796579ad2b3705d5 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Tue, 5 May 2026 19:19:43 -0700 Subject: [PATCH 3/7] Fix import order to satisfy checkstyle --- .../broker/protocol/SimpleProtocolHandlerTestsBase.java | 6 +++--- .../apache/pulsar/client/impl/ServiceUrlQuarantineTest.java | 6 +++--- .../proxy/extensions/SimpleProxyExtensionTestBase.java | 6 +++--- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java index cf4a329f5cd5f..8d03e4fb7e304 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java @@ -18,9 +18,6 @@ */ package org.apache.pulsar.broker.protocol; -import java.io.IOException; -import java.io.UncheckedIOException; -import java.net.ServerSocket; import static org.testng.Assert.assertEquals; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; @@ -31,7 +28,10 @@ import io.netty.channel.socket.SocketChannel; import java.io.File; import java.io.FileOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; import java.net.InetSocketAddress; +import java.net.ServerSocket; import java.net.Socket; import java.net.SocketAddress; import java.nio.charset.StandardCharsets; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ServiceUrlQuarantineTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ServiceUrlQuarantineTest.java index fae597d699dd7..c2cb9a05059fe 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ServiceUrlQuarantineTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ServiceUrlQuarantineTest.java @@ -21,7 +21,10 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; +import java.io.IOException; +import java.io.UncheckedIOException; import java.net.InetSocketAddress; +import java.net.ServerSocket; import java.net.URI; import java.net.URISyntaxException; import java.util.HashSet; @@ -37,9 +40,6 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionMode; -import java.io.IOException; -import java.io.UncheckedIOException; -import java.net.ServerSocket; import org.apache.pulsar.common.net.ServiceURI; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java index 8d660b93830df..f7d86fb6acd34 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java @@ -18,9 +18,6 @@ */ package org.apache.pulsar.proxy.extensions; -import java.io.IOException; -import java.io.UncheckedIOException; -import java.net.ServerSocket; import static org.mockito.Mockito.doReturn; import static org.testng.Assert.assertEquals; import io.netty.buffer.ByteBuf; @@ -32,7 +29,10 @@ import io.netty.channel.socket.SocketChannel; import java.io.File; import java.io.FileOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; import java.net.InetSocketAddress; +import java.net.ServerSocket; import java.net.Socket; import java.net.SocketAddress; import java.nio.charset.StandardCharsets; From b0e844dbab0f66620878e978ff4dff627ff31982 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Wed, 6 May 2026 08:38:48 -0700 Subject: [PATCH 4/7] Restore PortManager (improved) for tests that need pre-allocated ports Address review feedback: PortManager is still useful for tests that need to know a port BEFORE binding (advertised-listener URLs, znode pre-creation, unavailable-node URL synthesis). The old implementation suffered from the JVM-level lock not preventing OS-level collisions, since it allocated inside the kernel's ephemeral range. The restored PortManager addresses both problems: - Allocates from outside the ephemeral range (Linux default 32768-60999) by reserving a 1000-port block per JVM. Each JVM claims a block by binding a lock ServerSocket on the block's base port for the JVM's lifetime; other JVMs that hit the same range observe the bind failure and pick the next. - Released ports do not return to the pool immediately. They sit in a pending-release set until a fresh bind to the port succeeds, which avoids handing out a port that's still in TIME_WAIT. Wired PortManager back into the call sites that genuinely need pre-allocation: - ServiceUrlQuarantineTest (broker port + 40 unavailable-node ports). - ExtensibleLoadManagerImplWithAdvertisedListenersTest (advertised listeners). - ModularLoadManagerImplTest.testOwnBrokerZnodeByMultipleBroker (znode pre-creation). - AdvertisedListenersTest (advertised listeners per broker). - SimpleProtocolHandlerTestsBase / SimpleProxyExtensionTestBase (handler listener address). - PulsarTestContext.handlePreallocatePorts (preallocatePorts(true) preserved). Everything else still uses port 0 with kernel assignment. --- .../loadbalance/AdvertisedListenersTest.java | 18 +-- ...anagerImplWithAdvertisedListenersTest.java | 16 +-- .../impl/ModularLoadManagerImplTest.java | 45 +++---- .../SimpleProtocolHandlerTestsBase.java | 14 +- .../broker/testcontext/PulsarTestContext.java | 31 +++-- .../client/impl/ServiceUrlQuarantineTest.java | 30 ++--- .../pulsar/common/util/PortManager.java | 120 ++++++++++++++++++ .../pulsar/common/util/PortManagerTest.java | 59 +++++++++ .../SimpleProxyExtensionTestBase.java | 14 +- 9 files changed, 247 insertions(+), 100 deletions(-) create mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/util/PortManager.java create mode 100644 pulsar-common/src/test/java/org/apache/pulsar/common/util/PortManagerTest.java diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java index ba0edcf8dbb19..64c0ee3f47ab9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java @@ -18,11 +18,9 @@ */ package org.apache.pulsar.broker.loadbalance; +import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; -import java.io.IOException; -import java.io.UncheckedIOException; -import java.net.ServerSocket; import java.net.URI; import java.util.Optional; import lombok.Cleanup; @@ -69,9 +67,9 @@ protected ServiceConfiguration createConfForAdditionalBroker(int additionalBroke private void updateConfig(ServiceConfiguration conf, String advertisedAddress) { // Pre-allocate ports because the advertised listener URLs are baked into config // before the broker starts. The broker then binds to the same ports. - int pulsarPort = allocateFreePort(); - int httpPort = allocateFreePort(); - int httpsPort = allocateFreePort(); + int pulsarPort = nextLockedFreePort(); + int httpPort = nextLockedFreePort(); + int httpsPort = nextLockedFreePort(); // Use invalid domain name as identifier and instead make sure the advertised listeners work as intended conf.setAdvertisedAddress(advertisedAddress); @@ -83,14 +81,6 @@ private void updateConfig(ServiceConfiguration conf, String advertisedAddress) { conf.setWebServicePort(Optional.of(httpPort)); } - private static int allocateFreePort() { - try (ServerSocket socket = new ServerSocket(0)) { - return socket.getLocalPort(); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - @Test public void testLookup() throws Exception { HttpGet request = diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithAdvertisedListenersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithAdvertisedListenersTest.java index ec4fedfd3a9bd..656088c2ba5b3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithAdvertisedListenersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithAdvertisedListenersTest.java @@ -18,14 +18,12 @@ */ package org.apache.pulsar.broker.loadbalance.extensions; -import java.io.IOException; -import java.io.UncheckedIOException; -import java.net.ServerSocket; import java.util.Optional; import lombok.CustomLog; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.naming.TopicDomain; +import org.apache.pulsar.common.util.PortManager; import org.testng.annotations.DataProvider; import org.testng.annotations.Factory; import org.testng.annotations.Test; @@ -50,8 +48,8 @@ protected ServiceConfiguration updateConfig(ServiceConfiguration conf) { super.updateConfig(conf); // Pre-allocate ports because advertised listener URLs are baked into config // before the broker starts. - int privatePulsarPort = allocateFreePort(); - int publicPulsarPort = allocateFreePort(); + int privatePulsarPort = PortManager.nextLockedFreePort(); + int publicPulsarPort = PortManager.nextLockedFreePort(); conf.setInternalListenerName("internal"); conf.setBindAddresses("external:pulsar://localhost:" + publicPulsarPort); conf.setAdvertisedListeners( @@ -65,14 +63,6 @@ protected ServiceConfiguration updateConfig(ServiceConfiguration conf) { return conf; } - private static int allocateFreePort() { - try (ServerSocket socket = new ServerSocket(0)) { - return socket.getLocalPort(); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - @DataProvider(name = "isPersistentTopicSubscriptionTypeTest") public Object[][] isPersistentTopicSubscriptionTypeTest() { return new Object[][]{ diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java index c92de5eeb9cfe..ecabd901116f1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java @@ -39,7 +39,6 @@ import com.google.common.hash.Hashing; import java.lang.reflect.Field; import java.lang.reflect.Method; -import java.net.ServerSocket; import java.net.URL; import java.util.ArrayList; import java.util.EnumSet; @@ -89,6 +88,7 @@ import org.apache.pulsar.common.policies.data.ResourceQuota; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.common.util.PortManager; import org.apache.pulsar.metadata.api.MetadataCache; import org.apache.pulsar.metadata.api.Notification; import org.apache.pulsar.metadata.api.NotificationType; @@ -949,29 +949,30 @@ public void testOwnBrokerZnodeByMultipleBroker() throws Exception { config.setClusterName("use"); // Pre-allocate a port: the test creates a znode at the broker's would-be address before // starting the broker, so it needs to know the address up front. - int webPort; - try (ServerSocket socket = new ServerSocket(0)) { - webPort = socket.getLocalPort(); - } - config.setWebServicePort(Optional.of(webPort)); - config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort()); - config.setBrokerShutdownTimeoutMs(0L); - config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); - config.setBrokerServicePort(Optional.of(0)); - PulsarService pulsar = new PulsarService(config); - // create znode using different zk-session - final String brokerZnode = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + pulsar.getAdvertisedAddress() + ":" - + config.getWebServicePort().get(); - pulsar1.getLocalMetadataStore() - .put(brokerZnode, new byte[0], Optional.empty(), EnumSet.of(CreateOption.Ephemeral)).join(); + int webPort = PortManager.nextLockedFreePort(); try { - pulsar.start(); - fail("should have failed"); - } catch (PulsarServerException e) { - //Ok. - } + config.setWebServicePort(Optional.of(webPort)); + config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort()); + config.setBrokerShutdownTimeoutMs(0L); + config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); + config.setBrokerServicePort(Optional.of(0)); + PulsarService pulsar = new PulsarService(config); + // create znode using different zk-session + final String brokerZnode = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + pulsar.getAdvertisedAddress() + ":" + + config.getWebServicePort().get(); + pulsar1.getLocalMetadataStore() + .put(brokerZnode, new byte[0], Optional.empty(), EnumSet.of(CreateOption.Ephemeral)).join(); + try { + pulsar.start(); + fail("should have failed"); + } catch (PulsarServerException e) { + //Ok. + } - pulsar.close(); + pulsar.close(); + } finally { + PortManager.releaseLockedPort(webPort); + } } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java index 8d03e4fb7e304..f1cb937868a4e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.broker.protocol; +import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort; +import static org.apache.pulsar.common.util.PortManager.releaseLockedPort; import static org.testng.Assert.assertEquals; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; @@ -28,10 +30,7 @@ import io.netty.channel.socket.SocketChannel; import java.io.File; import java.io.FileOutputStream; -import java.io.IOException; -import java.io.UncheckedIOException; import java.net.InetSocketAddress; -import java.net.ServerSocket; import java.net.Socket; import java.net.SocketAddress; import java.nio.charset.StandardCharsets; @@ -91,12 +90,7 @@ public void start(BrokerService service) { public Map> newChannelInitializers() { // Pre-allocate a free port: protocol handlers need to register a listener at a known // address before the broker calls back into them. - int port; - try (ServerSocket socket = new ServerSocket(0)) { - port = socket.getLocalPort(); - } catch (IOException e) { - throw new UncheckedIOException(e); - } + int port = nextLockedFreePort(); this.ports.add(port); return Collections.singletonMap(new InetSocketAddress(conf.getBindAddress(), port), new ChannelInitializer() { @@ -123,7 +117,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { @Override public void close() { - ports.clear(); + ports.removeIf(p -> releaseLockedPort(p)); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java index 8e22fad2a76b4..c7136fcb84ab4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java @@ -62,6 +62,7 @@ import org.apache.pulsar.broker.storage.ManagedLedgerStorage; import org.apache.pulsar.broker.storage.ManagedLedgerStorageClass; import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown; +import org.apache.pulsar.common.util.PortManager; import org.apache.pulsar.compaction.CompactionServiceFactory; import org.apache.pulsar.compaction.Compactor; import org.apache.pulsar.compaction.PulsarCompactionServiceFactory; @@ -723,38 +724,36 @@ private TestZKServer createTestZookeeper(int sessionTimeout) throws Exception { protected void handlePreallocatePorts(ServiceConfiguration config) { if (super.preallocatePorts) { // Pre-allocate ports for callers that need the port number BEFORE the broker - // starts (e.g. to build advertised-listener URLs). We open a ServerSocket on - // port 0, take the kernel-assigned port, and close the socket. There is a brief - // window where another process could grab the same port; tests that don't need - // a pre-known port should leave preallocatePorts=false and bind to 0 directly. + // starts (e.g. to build advertised-listener URLs). PortManager hands out ports + // outside the ephemeral range, so the kernel won't auto-assign them to other + // processes. Tests that don't need a pre-known port should leave + // preallocatePorts=false and let the broker bind to 0 directly. config.getBrokerServicePort().ifPresent(portNumber -> { if (portNumber == 0) { - config.setBrokerServicePort(Optional.of(allocateFreePort())); + config.setBrokerServicePort(Optional.of(PortManager.nextLockedFreePort())); } }); config.getBrokerServicePortTls().ifPresent(portNumber -> { if (portNumber == 0) { - config.setBrokerServicePortTls(Optional.of(allocateFreePort())); + config.setBrokerServicePortTls(Optional.of(PortManager.nextLockedFreePort())); } }); config.getWebServicePort().ifPresent(portNumber -> { if (portNumber == 0) { - config.setWebServicePort(Optional.of(allocateFreePort())); + config.setWebServicePort(Optional.of(PortManager.nextLockedFreePort())); } }); config.getWebServicePortTls().ifPresent(portNumber -> { if (portNumber == 0) { - config.setWebServicePortTls(Optional.of(allocateFreePort())); + config.setWebServicePortTls(Optional.of(PortManager.nextLockedFreePort())); } }); - } - } - - private static int allocateFreePort() { - try (java.net.ServerSocket socket = new java.net.ServerSocket(0)) { - return socket.getLocalPort(); - } catch (java.io.IOException e) { - throw new java.io.UncheckedIOException(e); + registerCloseable(() -> { + config.getBrokerServicePort().ifPresent(PortManager::releaseLockedPort); + config.getBrokerServicePortTls().ifPresent(PortManager::releaseLockedPort); + config.getWebServicePort().ifPresent(PortManager::releaseLockedPort); + config.getWebServicePortTls().ifPresent(PortManager::releaseLockedPort); + }); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ServiceUrlQuarantineTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ServiceUrlQuarantineTest.java index c2cb9a05059fe..db465e643acbb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ServiceUrlQuarantineTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ServiceUrlQuarantineTest.java @@ -21,10 +21,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; -import java.io.IOException; -import java.io.UncheckedIOException; import java.net.InetSocketAddress; -import java.net.ServerSocket; import java.net.URI; import java.net.URISyntaxException; import java.util.HashSet; @@ -41,6 +38,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionMode; import org.apache.pulsar.common.net.ServiceURI; +import org.apache.pulsar.common.util.PortManager; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -56,6 +54,7 @@ public class ServiceUrlQuarantineTest extends ProducerConsumerBase { private PulsarClientImpl pulsarClientWithHttpServiceUrlDisableQuarantine; private int brokerServicePort; private int webServicePort; + private final Set reservedPorts = new HashSet<>(); private static final int UNAVAILABLE_NODES = 20; private static final int TIMEOUT_MS = 500; @@ -63,17 +62,18 @@ public class ServiceUrlQuarantineTest extends ProducerConsumerBase { @Override protected void setup() throws Exception { // Pre-allocate ports for the broker because they must match the URL the test builds below. - this.brokerServicePort = allocateFreePort(); - this.webServicePort = allocateFreePort(); + this.brokerServicePort = nextLockedFreePort(); + this.webServicePort = nextLockedFreePort(); super.internalSetup(); super.producerBaseSetup(); - // Create a Pulsar client with some unavailable nodes (port allocated, socket closed — - // nothing is bound there during the test). + // Build a service URL that includes a bunch of unavailable-node ports. PortManager + // hands out ports outside the ephemeral range, so nothing else is going to grab them + // and the connection attempts to those addresses will fail as expected. StringBuilder binaryServiceUrlBuilder = new StringBuilder(pulsar.getBrokerServiceUrl()); StringBuilder httpServiceUrlBuilder = new StringBuilder(pulsar.getWebServiceAddress()); for (int i = 0; i < UNAVAILABLE_NODES; i++) { - binaryServiceUrlBuilder.append(",127.0.0.1:").append(allocateFreePort()); - httpServiceUrlBuilder.append(",127.0.0.1:").append(allocateFreePort()); + binaryServiceUrlBuilder.append(",127.0.0.1:").append(nextLockedFreePort()); + httpServiceUrlBuilder.append(",127.0.0.1:").append(nextLockedFreePort()); } this.binaryServiceUrlWithUnavailableNodes = binaryServiceUrlBuilder.toString(); this.httpServiceUrlWithUnavailableNodes = httpServiceUrlBuilder.toString(); @@ -100,12 +100,10 @@ protected void setup() throws Exception { .build(); } - private static int allocateFreePort() { - try (ServerSocket socket = new ServerSocket(0)) { - return socket.getLocalPort(); - } catch (IOException e) { - throw new UncheckedIOException(e); - } + private int nextLockedFreePort() { + int port = PortManager.nextLockedFreePort(); + reservedPorts.add(port); + return port; } @Override @@ -138,6 +136,8 @@ protected void cleanup() throws Exception { if (pulsarClientWithHttpServiceUrlDisableQuarantine != null) { pulsarClientWithHttpServiceUrlDisableQuarantine.close(); } + reservedPorts.forEach(PortManager::releaseLockedPort); + reservedPorts.clear(); } @Test diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/PortManager.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/PortManager.java new file mode 100644 index 0000000000000..7686702c760f3 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/PortManager.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.util; + +import java.io.IOException; +import java.net.ServerSocket; +import java.util.HashSet; +import java.util.Set; + +/** + * Allocates ports for tests that need a known port number BEFORE binding (e.g. tests that build + * advertised-listener URLs, or that pre-create metadata znodes at the broker's would-be address). + * For everything else, prefer binding to port 0 and reading the kernel-assigned port back. + * + *

This manager allocates from outside the OS ephemeral range (Linux default 32768–60999), so + * the kernel will not hand out our reserved ports to other processes binding port 0. To coordinate + * across multiple test JVMs without a shared registry, each JVM claims a 1000-port block by + * binding a "lock" {@link ServerSocket} on the block's base port for the JVM's lifetime; other + * JVMs that hit the same range observe the bind failure and pick the next block. + * + *

Within the block, ports are tracked in a JVM-local set. Released ports are not returned to + * the pool immediately because closed sockets can sit in {@code TIME_WAIT}; instead they move + * back to the free list once a fresh bind to the port succeeds. + */ +public class PortManager { + + private static final int BLOCK_SIZE = 1000; + private static final int FIRST_BLOCK_BASE = 20000; + private static final int LAST_BLOCK_BASE = 32000; + + private static ServerSocket blockLock; + private static int blockBase; + private static final Set usedPorts = new HashSet<>(); + private static final Set pendingRelease = new HashSet<>(); + + /** + * Return a port that is currently free and is reserved for the caller until + * {@link #releaseLockedPort(int)} is invoked. + */ + public static synchronized int nextLockedFreePort() { + ensureBlockReserved(); + // Reclaim ports whose underlying socket has finished its TIME_WAIT and is bindable again. + pendingRelease.removeIf(PortManager::isPortBindable); + + for (int offset = 1; offset < BLOCK_SIZE; offset++) { + int port = blockBase + offset; + if (usedPorts.contains(port) || pendingRelease.contains(port)) { + continue; + } + if (isPortBindable(port)) { + usedPorts.add(port); + return port; + } + } + throw new RuntimeException("No free ports left in block " + blockBase + + " (used=" + usedPorts.size() + ", pendingRelease=" + pendingRelease.size() + ")"); + } + + /** + * Mark the given port as released. The port stays in a pending-release state and is not + * handed out again until a future {@link #nextLockedFreePort()} verifies it can be re-bound. + * + * @return true if the port was previously locked by this manager + */ + public static synchronized boolean releaseLockedPort(int lockedPort) { + if (!usedPorts.remove(lockedPort)) { + return false; + } + pendingRelease.add(lockedPort); + return true; + } + + /** + * @return true if the port is currently locked by this manager + */ + public static synchronized boolean checkPortIfLocked(int lockedPort) { + return usedPorts.contains(lockedPort); + } + + private static void ensureBlockReserved() { + if (blockLock != null) { + return; + } + for (int base = FIRST_BLOCK_BASE; base <= LAST_BLOCK_BASE; base += BLOCK_SIZE) { + try { + blockLock = new ServerSocket(base); + blockBase = base; + return; + } catch (IOException ignored) { + // block already taken (likely another test JVM); try the next one + } + } + throw new RuntimeException("Unable to reserve a port block in [" + + FIRST_BLOCK_BASE + ", " + (LAST_BLOCK_BASE + BLOCK_SIZE) + ")"); + } + + private static boolean isPortBindable(int port) { + try (ServerSocket socket = new ServerSocket(port)) { + return true; + } catch (IOException e) { + return false; + } + } +} diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/PortManagerTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/PortManagerTest.java new file mode 100644 index 0000000000000..410e0c82ad018 --- /dev/null +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/PortManagerTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.util; + +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertTrue; +import org.testng.annotations.Test; + +public class PortManagerTest { + + @Test + public void allocatesPortsOutsideEphemeralRange() { + int port = PortManager.nextLockedFreePort(); + try { + // Linux default ephemeral range is 32768-60999; we allocate below it. + assertTrue(port >= 20000 && port < 33000, "port " + port + " not in expected range"); + assertTrue(PortManager.checkPortIfLocked(port)); + } finally { + PortManager.releaseLockedPort(port); + } + } + + @Test + public void allocatesDistinctPorts() { + int p1 = PortManager.nextLockedFreePort(); + int p2 = PortManager.nextLockedFreePort(); + try { + assertNotEquals(p1, p2); + } finally { + PortManager.releaseLockedPort(p1); + PortManager.releaseLockedPort(p2); + } + } + + @Test + public void releasingMarksPortAsUnlocked() { + int port = PortManager.nextLockedFreePort(); + assertTrue(PortManager.checkPortIfLocked(port)); + assertTrue(PortManager.releaseLockedPort(port)); + assertFalse(PortManager.checkPortIfLocked(port)); + } +} diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java index f7d86fb6acd34..c834e09bd8925 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.proxy.extensions; +import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort; +import static org.apache.pulsar.common.util.PortManager.releaseLockedPort; import static org.mockito.Mockito.doReturn; import static org.testng.Assert.assertEquals; import io.netty.buffer.ByteBuf; @@ -29,10 +31,7 @@ import io.netty.channel.socket.SocketChannel; import java.io.File; import java.io.FileOutputStream; -import java.io.IOException; -import java.io.UncheckedIOException; import java.net.InetSocketAddress; -import java.net.ServerSocket; import java.net.Socket; import java.net.SocketAddress; import java.nio.charset.StandardCharsets; @@ -93,12 +92,7 @@ public void start(ProxyService service) { public Map> newChannelInitializers() { // Pre-allocate a free port: extension handlers need to register a listener at a known // address before the proxy calls back into them. - int port; - try (ServerSocket socket = new ServerSocket(0)) { - port = socket.getLocalPort(); - } catch (IOException e) { - throw new UncheckedIOException(e); - } + int port = nextLockedFreePort(); this.ports.add(port); return Collections.singletonMap(new InetSocketAddress(conf.getBindAddress(), port), new ChannelInitializer() { @@ -125,7 +119,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { @Override public void close() { - ports.clear(); + ports.removeIf(p -> releaseLockedPort(p)); } } From 3d5e2b68bc0da1a7ece541b13f40cbaa9da654a8 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Wed, 6 May 2026 08:42:31 -0700 Subject: [PATCH 5/7] =?UTF-8?q?Drop=20block-based=20allocation=20from=20Po?= =?UTF-8?q?rtManager=20=E2=80=94=20keep=20the=20simple=20ServerSocket(0)?= =?UTF-8?q?=20version?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../pulsar/common/util/PortManager.java | 87 ++++--------------- .../pulsar/common/util/PortManagerTest.java | 5 +- 2 files changed, 21 insertions(+), 71 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/PortManager.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/PortManager.java index 7686702c760f3..fab5104e8c39c 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/PortManager.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/PortManager.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.common.util; -import java.io.IOException; import java.net.ServerSocket; import java.util.HashSet; import java.util.Set; @@ -27,94 +26,46 @@ * Allocates ports for tests that need a known port number BEFORE binding (e.g. tests that build * advertised-listener URLs, or that pre-create metadata znodes at the broker's would-be address). * For everything else, prefer binding to port 0 and reading the kernel-assigned port back. - * - *

This manager allocates from outside the OS ephemeral range (Linux default 32768–60999), so - * the kernel will not hand out our reserved ports to other processes binding port 0. To coordinate - * across multiple test JVMs without a shared registry, each JVM claims a 1000-port block by - * binding a "lock" {@link ServerSocket} on the block's base port for the JVM's lifetime; other - * JVMs that hit the same range observe the bind failure and pick the next block. - * - *

Within the block, ports are tracked in a JVM-local set. Released ports are not returned to - * the pool immediately because closed sockets can sit in {@code TIME_WAIT}; instead they move - * back to the free list once a fresh bind to the port succeeds. */ public class PortManager { - private static final int BLOCK_SIZE = 1000; - private static final int FIRST_BLOCK_BASE = 20000; - private static final int LAST_BLOCK_BASE = 32000; - - private static ServerSocket blockLock; - private static int blockBase; - private static final Set usedPorts = new HashSet<>(); - private static final Set pendingRelease = new HashSet<>(); + private static final Set PORTS = new HashSet<>(); /** - * Return a port that is currently free and is reserved for the caller until - * {@link #releaseLockedPort(int)} is invoked. + * Return a free port that is reserved for the caller until {@link #releaseLockedPort(int)} + * is invoked. */ public static synchronized int nextLockedFreePort() { - ensureBlockReserved(); - // Reclaim ports whose underlying socket has finished its TIME_WAIT and is bindable again. - pendingRelease.removeIf(PortManager::isPortBindable); - - for (int offset = 1; offset < BLOCK_SIZE; offset++) { - int port = blockBase + offset; - if (usedPorts.contains(port) || pendingRelease.contains(port)) { - continue; - } - if (isPortBindable(port)) { - usedPorts.add(port); - return port; + int exceptionCount = 0; + while (true) { + try (ServerSocket ss = new ServerSocket(0)) { + int port = ss.getLocalPort(); + if (!checkPortIfLocked(port)) { + PORTS.add(port); + return port; + } + } catch (Exception e) { + exceptionCount++; + if (exceptionCount > 100) { + throw new RuntimeException("Unable to allocate socket port", e); + } } } - throw new RuntimeException("No free ports left in block " + blockBase - + " (used=" + usedPorts.size() + ", pendingRelease=" + pendingRelease.size() + ")"); } /** - * Mark the given port as released. The port stays in a pending-release state and is not - * handed out again until a future {@link #nextLockedFreePort()} verifies it can be re-bound. + * Release a previously locked port. * * @return true if the port was previously locked by this manager */ public static synchronized boolean releaseLockedPort(int lockedPort) { - if (!usedPorts.remove(lockedPort)) { - return false; - } - pendingRelease.add(lockedPort); - return true; + return PORTS.remove(lockedPort); } /** * @return true if the port is currently locked by this manager */ public static synchronized boolean checkPortIfLocked(int lockedPort) { - return usedPorts.contains(lockedPort); - } - - private static void ensureBlockReserved() { - if (blockLock != null) { - return; - } - for (int base = FIRST_BLOCK_BASE; base <= LAST_BLOCK_BASE; base += BLOCK_SIZE) { - try { - blockLock = new ServerSocket(base); - blockBase = base; - return; - } catch (IOException ignored) { - // block already taken (likely another test JVM); try the next one - } - } - throw new RuntimeException("Unable to reserve a port block in [" - + FIRST_BLOCK_BASE + ", " + (LAST_BLOCK_BASE + BLOCK_SIZE) + ")"); - } - - private static boolean isPortBindable(int port) { - try (ServerSocket socket = new ServerSocket(port)) { - return true; - } catch (IOException e) { - return false; - } + return PORTS.contains(lockedPort); } } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/PortManagerTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/PortManagerTest.java index 410e0c82ad018..b3f91617a5c1a 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/PortManagerTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/PortManagerTest.java @@ -26,11 +26,10 @@ public class PortManagerTest { @Test - public void allocatesPortsOutsideEphemeralRange() { + public void allocatesAFreePort() { int port = PortManager.nextLockedFreePort(); try { - // Linux default ephemeral range is 32768-60999; we allocate below it. - assertTrue(port >= 20000 && port < 33000, "port " + port + " not in expected range"); + assertTrue(port > 0); assertTrue(PortManager.checkPortIfLocked(port)); } finally { PortManager.releaseLockedPort(port); From 2073eb363890a9e0d665dd3f6522096ca0413a55 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Wed, 6 May 2026 13:37:48 -0700 Subject: [PATCH 6/7] Fix CI failures from cookie collisions and worker-id duplication MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two distinct issues surfaced on https://github.com/apache/pulsar/actions/runs/25449522387: 1. BookKeeperClusterTestCase (3 copies in pulsar-metadata, managed-ledger, pulsar-package-management) now starts bookies on port 0. Multiple bookies ended up sharing the cookie identity 127.0.0.1:0, breaking cookie validation on the second bookie. This code path (unlike BKCluster, which uses bookieId in metadata) needs the bookie to be reachable by host:port for the test client's address resolver to work — bookieId-only registration leads to UnknownHostException when clients try to dial the bookie. Fix: route through PortManager.nextLockedFreePort() again for these cases. This is exactly the use case lhotari called out in the PR review. 2. PulsarFunctionTlsTest configured both brokers with WebServicePortTls=0. PulsarService.initializeWorkerConfigFromBrokerConfig builds the workerId from "{cluster}-fw-{host}-{configuredPort}", which means both brokers got the same workerId, the function-worker membership manager never elected a leader, and the test timed out waiting for one. Fix: pre-allocate WebServicePortTls via PortManager so each broker gets a unique configured port (and therefore a unique workerId). --- .../bookkeeper/test/BookKeeperClusterTestCase.java | 8 ++++++-- .../pulsar/functions/worker/PulsarFunctionTlsTest.java | 9 ++++++++- .../replication/BookKeeperClusterTestCase.java | 8 ++++++-- .../bookkeeper/test/BookKeeperClusterTestCase.java | 8 ++++++-- 4 files changed, 26 insertions(+), 7 deletions(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java index 378f7720d5f54..7e5486eac293d 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java @@ -64,6 +64,7 @@ import org.apache.bookkeeper.proto.BookieServer; import org.apache.bookkeeper.replication.Auditor; import org.apache.bookkeeper.replication.ReplicationWorker; +import org.apache.pulsar.common.util.PortManager; import org.apache.pulsar.metadata.api.MetadataStoreConfig; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore; @@ -289,12 +290,15 @@ protected void stopBKCluster() throws Exception { t.shutdown(); } servers.clear(); - bookiePorts.clear(); + bookiePorts.removeIf(PortManager::releaseLockedPort); } protected ServerConfiguration newServerConfiguration() throws Exception { File f = tmpDirs.createNew("bookie", "test"); - return newServerConfiguration(0, f, new File[] { f }); + // Bookies need a pre-allocated port: BK identifies them by host:port in metadata + // and the test client resolves that back to a TCP address. Port 0 would leave + // the cookie + registration with port=0, which fails DNS-style resolution. + return newServerConfiguration(PortManager.nextLockedFreePort(), f, new File[] { f }); } protected ClientConfiguration newClientConfiguration() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java index 734a3e473c093..9727dfb8f41bc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java @@ -48,6 +48,7 @@ import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.util.ClassLoaderUtils; import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.common.util.PortManager; import org.apache.pulsar.functions.api.utils.IdentityFunction; import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory; import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig; @@ -104,7 +105,11 @@ void setup() throws Exception { config.setBrokerShutdownTimeoutMs(0L); config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setWebServicePort(Optional.empty()); - config.setWebServicePortTls(Optional.of(0)); + // Pre-allocate the TLS web port: PulsarService.initializeWorkerConfigFromBrokerConfig + // builds workerId = "c-{cluster}-fw-{host}-{port}" from the CONFIGURED port. Two + // brokers configured with port 0 would end up with the same workerId and the + // function-worker membership manager would never elect a leader. + config.setWebServicePortTls(Optional.of(PortManager.nextLockedFreePort())); config.setBrokerServicePort(Optional.empty()); config.setBrokerServicePortTls(Optional.of(0)); config.setClusterName("my-cluster"); @@ -215,6 +220,8 @@ void tearDown() throws Exception { } for (int i = 0; i < BROKER_COUNT; i++) { if (pulsarServices[i] != null) { + pulsarServices[i].getConfiguration() + .getWebServicePortTls().ifPresent(PortManager::releaseLockedPort); pulsarServices[i].close(); pulsarServices[i] = null; } diff --git a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookKeeperClusterTestCase.java b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookKeeperClusterTestCase.java index b3f1786bf810b..612a05fba7ff0 100644 --- a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookKeeperClusterTestCase.java +++ b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookKeeperClusterTestCase.java @@ -64,6 +64,7 @@ import org.apache.bookkeeper.test.TmpDirs; import org.apache.bookkeeper.test.ZooKeeperCluster; import org.apache.bookkeeper.test.ZooKeeperClusterUtil; +import org.apache.pulsar.common.util.PortManager; import org.apache.pulsar.metadata.api.MetadataStoreConfig; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore; @@ -309,12 +310,15 @@ protected void stopBKCluster() throws Exception { t.shutdown(); } servers.clear(); - bookiePorts.clear(); + bookiePorts.removeIf(PortManager::releaseLockedPort); } protected ServerConfiguration newServerConfiguration() throws Exception { File f = tmpDirs.createNew("bookie", "test"); - return newServerConfiguration(0, f, new File[] { f }); + // Bookies need a pre-allocated port: BK identifies them by host:port in metadata + // and the test client resolves that back to a TCP address. Port 0 would leave + // the cookie + registration with port=0, which fails DNS-style resolution. + return newServerConfiguration(PortManager.nextLockedFreePort(), f, new File[] { f }); } protected ClientConfiguration newClientConfiguration() { diff --git a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java index b1d2e47cc4580..d8a06a04e2c6e 100644 --- a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java +++ b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java @@ -66,6 +66,7 @@ import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.conf.TestBKConfiguration; +import org.apache.pulsar.common.util.PortManager; import org.apache.bookkeeper.discover.BookieServiceInfo; import org.apache.bookkeeper.discover.RegistrationManager; import org.apache.bookkeeper.meta.LedgerManager; @@ -284,12 +285,15 @@ protected void stopBKCluster() throws Exception { t.shutdown(); } servers.clear(); - bookiePorts.clear(); + bookiePorts.removeIf(PortManager::releaseLockedPort); } protected ServerConfiguration newServerConfiguration() throws Exception { File f = tmpDirs.createNew("bookie", "test"); - return newServerConfiguration(0, f, new File[] { f }); + // Bookies need a pre-allocated port: BK identifies them by host:port in metadata + // and the test client resolves that back to a TCP address. Port 0 would leave + // the cookie + registration with port=0, which fails DNS-style resolution. + return newServerConfiguration(PortManager.nextLockedFreePort(), f, new File[] { f }); } protected ClientConfiguration newClientConfiguration() { From f3aeb3e3a80507556d1e5851b6d1715a8e69db6e Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Wed, 6 May 2026 13:53:03 -0700 Subject: [PATCH 7/7] Fix import order to satisfy checkstyle --- .../bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java index d8a06a04e2c6e..33080e42832bc 100644 --- a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java +++ b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java @@ -66,7 +66,6 @@ import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.conf.TestBKConfiguration; -import org.apache.pulsar.common.util.PortManager; import org.apache.bookkeeper.discover.BookieServiceInfo; import org.apache.bookkeeper.discover.RegistrationManager; import org.apache.bookkeeper.meta.LedgerManager; @@ -85,6 +84,7 @@ import org.apache.bookkeeper.test.ZooKeeperCluster; import org.apache.bookkeeper.test.ZooKeeperClusterUtil; import org.apache.bookkeeper.util.DiskChecker; +import org.apache.pulsar.common.util.PortManager; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooKeeper; import org.testng.annotations.AfterMethod;