Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -506,31 +506,59 @@ public void startBK() throws Exception {
public void stop() throws Exception {
if (null != streamStorage) {
log.debug("Local bk stream storage stopping ...");
streamStorage.close();
try {
streamStorage.close();
} catch (Exception e) {
log.warn().exception(e).log("failed to shutdown stream storage");
}
}

log.debug("Local ZK/BK stopping ...");
for (LifecycleComponent bookie : bookieComponents) {
try {
if (bookie != null) {
bookie.close();
if (bookieComponents != null) {
for (LifecycleComponent bookie : bookieComponents) {
try {
if (bookie != null) {
bookie.close();
}
} catch (Exception e) {
log.warn().exception(e).log("failed to shutdown bookie");
}
} catch (Exception e) {
log.warn().exception(e).log("failed to shutdown bookie");
}
}

zkc.close();
zks.shutdown();
serverFactory.shutdown();
if (zkc != null) {
try {
zkc.close();
} catch (Exception e) {
log.warn().exception(e).log("failed to close zk client");
}
}
if (zks != null) {
try {
zks.shutdown();
} catch (Exception e) {
log.warn().exception(e).log("failed to shutdown zk server");
}
}
if (serverFactory != null) {
try {
serverFactory.shutdown();
} catch (Exception e) {
log.warn().exception(e).log("failed to shutdown zk server factory");
}
}

if (zkDataCleanupManager != null) {
zkDataCleanupManager.shutdown();
}
log.debug("Local ZK/BK stopped");
for (File managedDir : temporaryDirectories) {
log.info().attr("directory", managedDir).log("deleting test directory");
FileUtils.deleteDirectory(managedDir);
try {
FileUtils.deleteDirectory(managedDir);
} catch (Exception e) {
log.warn().attr("directory", managedDir).exception(e).log("failed to delete test directory");
}
}
temporaryDirectories.clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.PortManager;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.awaitility.Awaitility;
import org.testng.Assert;
Expand All @@ -42,14 +41,17 @@
public class ShadowTopicRealBkTest {

private static final String cluster = "test";
private final int zkPort = PortManager.nextLockedFreePort();
private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, zkPort, PortManager::nextLockedFreePort);
// Pass 0 for both ZK and bookie ports so the kernel picks free ports at bind time, avoiding
// any JVM-vs-OS race on pre-allocated ports. The actual ZK port is read back via
// bk.getZookeeperPort().
private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, 0, () -> 0);
private PulsarService pulsar;
private PulsarAdmin admin;

@BeforeClass
public void setup() throws Exception {
bk.start();
final int zkPort = bk.getZookeeperPort();
final var config = new ServiceConfiguration();
config.setClusterName(cluster);
config.setAdvertisedAddress("localhost");
Expand All @@ -68,7 +70,11 @@ public void setup() throws Exception {
@AfterClass(alwaysRun = true)
public void cleanup() throws Exception {
if (pulsar != null) {
pulsar.close();
try {
pulsar.close();
} catch (Exception e) {
// best effort cleanup; setup may have failed before pulsar was fully initialized
}
}
bk.stop();
}
Expand Down
Loading