Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.bookkeeper.mledger.impl;

import static org.apache.pulsar.common.util.PortManager.releaseLockedPort;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
Expand Down Expand Up @@ -134,7 +133,7 @@ public void testBookieFailure() throws Exception {
metadataStore.unsetAlwaysFail();

bkc = new PulsarBookKeeperTestClient(baseClientConf);
int port = startNewBookie();
startNewBookie();

// Reconnect a new bk client
factory.shutdown();
Expand Down Expand Up @@ -164,7 +163,6 @@ public void testBookieFailure() throws Exception {
assertEquals("entry-2", new String(entries.get(0).getData()));
entries.forEach(Entry::release);
factory.shutdown();
releaseLockedPort(port);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
package org.apache.bookkeeper.test;

import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
import static org.testng.Assert.assertFalse;
import com.google.common.base.Stopwatch;
import java.io.File;
Expand Down Expand Up @@ -296,14 +295,10 @@ protected void stopBKCluster() throws Exception {

protected ServerConfiguration newServerConfiguration() throws Exception {
File f = tmpDirs.createNew("bookie", "test");

int port;
if (baseConf.isEnableLocalTransport() || !baseConf.getAllowEphemeralPorts()) {
port = nextLockedFreePort();
} else {
port = 0;
}
return newServerConfiguration(port, f, new File[] { f });
// Bookies need a pre-allocated port: BK identifies them by host:port in metadata
// and the test client resolves that back to a TCP address. Port 0 would leave
// the cookie + registration with port=0, which fails DNS-style resolution.
return newServerConfiguration(PortManager.nextLockedFreePort(), f, new File[] { f });
}

protected ClientConfiguration newClientConfiguration() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,6 @@ public void setBkEnsemble(LocalBookkeeperEnsemble bkEnsemble) {
this.bkEnsemble = bkEnsemble;
}

public void setBkPort(int bkPort) {
this.bkPort = bkPort;
}

public void setBkDir(String bkDir) {
this.bkDir = bkDir;
}
Expand Down Expand Up @@ -172,10 +168,6 @@ public int getZkPort() {
return zkPort;
}

public int getBkPort() {
return bkPort;
}

public String getZkDir() {
return zkDir;
}
Expand Down Expand Up @@ -237,9 +229,6 @@ public boolean isHelp() {
hidden = true)
private int zkPort = 2181;

@Option(names = { "--bookkeeper-port" }, description = "Local bookies base port")
private int bkPort = 3181;

@Option(names = { "--zookeeper-dir" },
description = "Local zooKeeper's data directory",
hidden = true)
Expand Down Expand Up @@ -470,7 +459,6 @@ void startBookieWithMetadataStore() throws Exception {
bkCluster = BKCluster.builder()
.baseServerConfiguration(bkServerConf)
.metadataServiceUri(metadataStoreUrl)
.bkPort(bkPort)
.numBookies(numOfBk)
.dataDir(bkDir)
.clearOldData(wipeData)
Expand All @@ -484,9 +472,9 @@ private void startBookieWithZookeeper() throws Exception {
ServerConfiguration bkServerConf = new ServerConfiguration();
bkServerConf.loadConf(new File(configFile).toURI().toURL());
calculateCacheSize(bkServerConf);
// Start LocalBookKeeper
// Start LocalBookKeeper. Bookies bind to kernel-assigned ports.
bkEnsemble = new LocalBookkeeperEnsemble(
this.getNumOfBk(), this.getZkPort(), this.getBkPort(), this.getStreamStoragePort(), this.getZkDir(),
this.getNumOfBk(), this.getZkPort(), this.getStreamStoragePort(), this.getZkDir(),
this.getBkDir(), this.isWipeData(), "127.0.0.1");
bkEnsemble.startStandalone(bkServerConf, !this.isNoStreamStorage());
config.setMetadataStoreUrl("zk:127.0.0.1:" + zkPort);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,6 @@ public PulsarStandaloneBuilder withZkPort(int zkPort) {
return this;
}

public PulsarStandaloneBuilder withBkPort(int bkPort) {
pulsarStandalone.setBkPort(bkPort);
return this;
}

public PulsarStandaloneBuilder withZkDir(String zkDir) {
pulsarStandalone.setZkDir(zkDir);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import lombok.CustomLog;
Expand Down Expand Up @@ -91,46 +90,18 @@ public class LocalBookkeeperEnsemble {
int numberOfBookies;
private final boolean clearOldData;

private static class BasePortManager implements Supplier<Integer> {

private int port;

public BasePortManager(int basePort) {
this.port = basePort;
}

@Override
public synchronized Integer get() {
return port++;
}
public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort) {
this(numberOfBookies, zkPort, 4181, null, null, true, null);
}

private final Supplier<Integer> portManager;

public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, Supplier<Integer> portManager) {
this(numberOfBookies, zkPort, 4181, null, null, true, null, portManager);
}

public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, int bkBasePort, String zkDataDirName,
public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, String zkDataDirName,
String bkDataDirName, boolean clearOldData) {
this(numberOfBookies, zkPort, bkBasePort, 4181, zkDataDirName, bkDataDirName, clearOldData, null);
this(numberOfBookies, zkPort, 4181, zkDataDirName, bkDataDirName, clearOldData, null);
}

public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, int bkBasePort, String zkDataDirName,
public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, String zkDataDirName,
String bkDataDirName, boolean clearOldData, String advertisedAddress) {
this(numberOfBookies, zkPort, bkBasePort, 4181, zkDataDirName, bkDataDirName, clearOldData, advertisedAddress);
}

public LocalBookkeeperEnsemble(int numberOfBookies,
int zkPort,
int bkBasePort,
int streamStoragePort,
String zkDataDirName,
String bkDataDirName,
boolean clearOldData,
String advertisedAddress) {
this(numberOfBookies, zkPort, streamStoragePort, zkDataDirName, bkDataDirName, clearOldData, advertisedAddress,
bkBasePort != 0 ? new BasePortManager(bkBasePort) : () -> 0);
this(numberOfBookies, zkPort, 4181, zkDataDirName, bkDataDirName, clearOldData, advertisedAddress);
}

public LocalBookkeeperEnsemble(int numberOfBookies,
Expand All @@ -139,10 +110,8 @@ public LocalBookkeeperEnsemble(int numberOfBookies,
String zkDataDirName,
String bkDataDirName,
boolean clearOldData,
String advertisedAddress,
Supplier<Integer> portManager) {
String advertisedAddress) {
this.numberOfBookies = numberOfBookies;
this.portManager = portManager;
this.streamStoragePort = streamStoragePort;
this.zkDataDirName = zkDataDirName;
this.bkDataDirName = bkDataDirName;
Expand Down Expand Up @@ -301,7 +270,8 @@ private void runBookies(ServerConfiguration baseConf) throws Exception {
cleanDirectory(bkDataDir);
}

int bookiePort = portManager.get();
// Bookies bind to a kernel-assigned port; identity is established via bookieId.
int bookiePort = 0;
String bookieId = "bk" + i + "test";
// Ensure registration Z-nodes are cleared when standalone service is restarted ungracefully
deleteBookieRegistrationZnode(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ public void testStandaloneWithRocksDB() throws Exception {

PulsarStandaloneStarter standalone = new PulsarStandaloneStarter(args);
standalone.setBkDir(tempDir.getAbsolutePath());
standalone.setBkPort(0);
standalone.setNumOfBk(bookieNum);

standalone.startBookieWithMetadataStore();
Expand All @@ -67,10 +66,12 @@ public void testStandaloneWithRocksDB() throws Exception {
List<ServerConfiguration> secondBsConfs = standalone.bkCluster.getBsConfs();
Assert.assertEquals(secondBsConfs.size(), bookieNum);

// Cookies must be preserved across restart (otherwise bookie startup would have failed
// with InvalidCookieException). The bookieId is the persistent identity.
for (int i = 0; i < bookieNum; i++) {
ServerConfiguration conf1 = firstBsConfs.get(i);
ServerConfiguration conf2 = secondBsConfs.get(i);
Assert.assertEquals(conf1.getBookiePort(), conf2.getBookiePort());
Assert.assertEquals(conf1.getBookieId(), conf2.getBookieId());
}
standalone.close();
cleanDirectory(tempDir);
Expand All @@ -93,7 +94,6 @@ public void testMetadataInitialization(boolean enableBrokerClientAuth) throws Ex
}
final File bkDir = IOUtils.createTempDir("standalone", "bk");
standalone.setNumOfBk(1);
standalone.setBkPort(0);
standalone.setBkDir(bkDir.getAbsolutePath());
standalone.start();

Expand Down Expand Up @@ -148,7 +148,6 @@ public void testShutdownHookClosesBkCluster() throws Exception {
bkDir.getAbsolutePath()
});
standalone.setTestMode(true);
standalone.setBkPort(0);
standalone.start();
BKCluster bkCluster = standalone.bkCluster;
standalone.runShutdownHook();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import lombok.CustomLog;
import org.apache.bookkeeper.client.PulsarMockBookKeeper;
import org.apache.bookkeeper.common.util.OrderedExecutor;
Expand All @@ -35,7 +34,6 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown;
import org.apache.pulsar.common.util.PortManager;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;

Expand All @@ -46,24 +44,18 @@ public abstract class MultiBrokerBaseTest extends MockedPulsarServiceBaseTest {
protected List<PulsarAdmin> additionalBrokerAdmins;
protected List<PulsarClient> additionalBrokerClients;
protected PulsarMockBookKeeper mockBookKeeper;
// Populated after broker startup with kernel-assigned ports.
protected int mainBrokerPort;
protected List<Integer> additionalBrokerPorts = new ArrayList<>();

protected int numberOfAdditionalBrokers() {
return 2;
}

protected boolean useDynamicBrokerPorts() {
return true;
}

@BeforeClass(alwaysRun = true)
@Override
public final void setup() throws Exception {
beforeSetup();
if (!useDynamicBrokerPorts()) {
mainBrokerPort = PortManager.nextLockedFreePort();
}
OrderedExecutor mockBookKeeperExecutor = OrderedExecutor.newBuilder().numThreads(1)
.name(MultiBrokerBaseTest.class.getSimpleName() + "-bk-executor").build();
registerCloseable(() -> GracefulExecutorServicesShutdown.initiate()
Expand All @@ -75,6 +67,7 @@ public final void setup() throws Exception {
((NonClosableMockBookKeeper) mockBookKeeper).reallyShutdown();
});
super.internalSetup();
mainBrokerPort = pulsar.getBrokerListenPort().orElse(0);
additionalBrokersSetup();
pulsarResourcesSetup();
additionalSetup();
Expand All @@ -89,14 +82,6 @@ protected void customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder p
pulsarTestContextBuilder.bookKeeperClient(mockBookKeeper);
}

@Override
protected void doInitConf() throws Exception {
super.doInitConf();
if (!useDynamicBrokerPorts()) {
conf.setBrokerServicePort(Optional.of(mainBrokerPort));
}
}

protected void additionalSetup() throws Exception {
// override this method to add any additional setup logic

Expand All @@ -115,17 +100,12 @@ protected void additionalBrokersSetup() throws Exception {
additionalBrokerClients = new ArrayList<>(numberOfAdditionalBrokers);
additionalPulsarTestContexts = new ArrayList<>(numberOfAdditionalBrokers);
additionalBrokerPorts = new ArrayList<>(numberOfAdditionalBrokers);
if (!useDynamicBrokerPorts()) {
for (int i = 0; i < numberOfAdditionalBrokers; i++) {
int port = PortManager.nextLockedFreePort();
additionalBrokerPorts.add(port);
}
}
for (int i = 0; i < numberOfAdditionalBrokers; i++) {
PulsarTestContext pulsarTestContext = createAdditionalBroker(i);
additionalPulsarTestContexts.add(i, pulsarTestContext);
PulsarService pulsarService = pulsarTestContext.getPulsarService();
additionalBrokers.add(i, pulsarService);
additionalBrokerPorts.add(pulsarService.getBrokerListenPort().orElse(0));
PulsarAdminBuilder pulsarAdminBuilder =
PulsarAdmin.builder().serviceHttpUrl(pulsarService.getWebServiceAddress() != null
? pulsarService.getWebServiceAddress()
Expand Down Expand Up @@ -159,9 +139,6 @@ protected ServiceConfiguration createConfForAdditionalBroker(int additionalBroke

protected PulsarTestContext createAdditionalBroker(int additionalBrokerIndex) throws Exception {
ServiceConfiguration conf = createConfForAdditionalBroker(additionalBrokerIndex);
if (!useDynamicBrokerPorts()) {
conf.setBrokerServicePort(Optional.of(additionalBrokerPorts.get(additionalBrokerIndex)));
}
return createAdditionalPulsarTestContext(conf);
}

Expand All @@ -175,14 +152,6 @@ public final void cleanup() throws Exception {
log.warn().exception(e).log("Exception during additional cleanup");
}
super.internalCleanup();
if (!useDynamicBrokerPorts()) {
if (mainBrokerPort > 0) {
PortManager.releaseLockedPort(mainBrokerPort);
}
for (Integer port : additionalBrokerPorts) {
PortManager.releaseLockedPort(port);
}
}
}

protected void additionalCleanup() throws Exception {
Expand Down Expand Up @@ -212,9 +181,6 @@ protected void additionalBrokersCleanup() {
try {
pulsarService.getConfiguration().setBrokerShutdownTimeoutMs(0L);
pulsarTestContext.close();
pulsarService.getConfiguration().getBrokerServicePort().ifPresent(PortManager::releaseLockedPort);
pulsarService.getConfiguration().getWebServicePort().ifPresent(PortManager::releaseLockedPort);
pulsarService.getConfiguration().getWebServicePortTls().ifPresent(PortManager::releaseLockedPort);
} catch (Exception e) {
log.warn().exception(e).log("Failed to stop additional broker");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ void setup() throws Exception {
new LinkedBlockingQueue<>());
log.info("---- Initializing SLAMonitoringTest -----");
// Start local bookkeeper ensemble
bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
bkEnsemble.start();

// start brokers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public class GetPartitionMetadataTest extends TestRetrySupport {
@BeforeClass(alwaysRun = true)
protected void setup() throws Exception {
incrementSetupNumber();
bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
bkEnsemble.start();
// Start broker.
setupBrokers();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,11 +197,6 @@ protected int numberOfAdditionalBrokers() {
return 1;
}

@Override
protected boolean useDynamicBrokerPorts() {
return false;
}

@BeforeMethod(alwaysRun = true)
public final void doBeforeMethod() {
beforeMethod();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ protected ServiceConfiguration createConfForAdditionalBroker(int additionalBroke
}

private void updateConfig(ServiceConfiguration conf, String advertisedAddress) {
// Pre-allocate ports because the advertised listener URLs are baked into config
// before the broker starts. The broker then binds to the same ports.
int pulsarPort = nextLockedFreePort();
int httpPort = nextLockedFreePort();
int httpsPort = nextLockedFreePort();
Expand Down
Loading
Loading