Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1117,31 +1117,7 @@ public CompletableFuture<Optional<Topic>> getTopic(final TopicName topicName, bo
// The topic level policies are not needed now, but the meaning of calling
// "getTopicPoliciesBypassSystemTopic" will wait for system topic policies initialization.
getTopicPoliciesBypassSystemTopic(topicName, TopicPoliciesService.GetType.LOCAL_ONLY)
.thenCompose(optionalTopicPolicies -> {
if (topicName.isPartitioned()) {
final TopicName topicNameEntity = TopicName.get(topicName.getPartitionedTopicName());
return fetchPartitionedTopicMetadataAsync(topicNameEntity)
.thenCompose((metadata) -> {
// Allow creating non-partitioned persistent topic that name includes
// `partition`
if (metadata.partitions == 0
|| topicName.getPartitionIndex() < metadata.partitions) {
return topics.computeIfAbsent(topicName.toString(), (tpName) ->
loadOrCreatePersistentTopic(context));
} else {
final String errorMsg =
String.format("Illegal topic partition name %s with max allowed "
+ "%d partitions", topicName, metadata.partitions);
log.warn(errorMsg);
return FutureUtil.failedFuture(
new BrokerServiceException.NotAllowedException(errorMsg));
}
});
} else {
return topics.computeIfAbsent(topicName.toString(), (tpName) ->
loadOrCreatePersistentTopic(context));
}
}).thenRun(() -> {
.thenRun(() -> {
final var inserted = new MutableBoolean(false);
final var cachedFuture = topics.computeIfAbsent(topicName.toString(), ___ -> {
inserted.setTrue();
Expand Down Expand Up @@ -1678,9 +1654,32 @@ public PulsarAdmin getClusterPulsarAdmin(String cluster, Optional<ClusterData> c
* loading and puts them into queue once in-process topics are created.
*/
protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(TopicLoadingContext context) {
final var topicName = context.getTopicName();
final var topic = context.getTopicName().toString();
final CompletableFuture<Void> ownedFuture;
if (topicName.isPartitioned()) {
final TopicName topicNameEntity = TopicName.get(topicName.getPartitionedTopicName());
ownedFuture = fetchPartitionedTopicMetadataAsync(topicNameEntity)
.thenCompose((metadata) -> {
// Allow creating non-partitioned persistent topic that name includes
// `partition`
if (metadata.partitions == 0
|| topicName.getPartitionIndex() < metadata.partitions) {
return checkTopicNsOwnership(topic);
} else {
final String errorMsg =
String.format("Illegal topic partition name %s with max allowed "
+ "%d partitions", topicName, metadata.partitions);
log.warn(errorMsg);
return FutureUtil.failedFuture(
new BrokerServiceException.NotAllowedException(errorMsg));
}
});
} else {
ownedFuture = checkTopicNsOwnership(topic);
}
final var topicFuture = context.getTopicFuture();
checkTopicNsOwnership(topic)
ownedFuture
.thenRun(() -> {
final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get();

Expand Down
Loading