Skip to content

Commit b3d598b

Browse files
committed
unify topics.remove code.
1 parent 17fb913 commit b3d598b

1 file changed

Lines changed: 39 additions & 32 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

Lines changed: 39 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1006,7 +1006,7 @@ public CompletableFuture<Optional<Topic>> getTopic(final String topic, boolean c
10061006
* 0. If topic future exists in the cache returned directly regardless of whether it fails or timeout.
10071007
* 1. If the topic metadata exists, the topic is created regardless of {@code createIfMissing}.
10081008
* 2. If the topic metadata not exists, and {@code createIfMissing} is false,
1009-
* returns an empty Optional in a CompletableFuture.
1009+
* returns an empty Optional in a CompletableFuture. And this empty future not be added to the map.
10101010
* 3. Otherwise, use computeIfAbsent. It returns the existing topic or creates and adds a new topicFuture.
10111011
* Any exceptions will remove the topicFuture from the map.
10121012
*
@@ -1039,7 +1039,8 @@ public CompletableFuture<Optional<Topic>> getTopic(final TopicName topicName, bo
10391039
throw FutureUtil.wrapToCompletionException(new ServiceUnitNotReadyException(errorInfo));
10401040
}).thenCompose(optionalTopicPolicies -> {
10411041
final TopicPolicies topicPolicies = optionalTopicPolicies.orElse(null);
1042-
return topics.computeIfAbsent(topicName.toString(), (tpName) -> {
1042+
CompletableFuture<Optional<Topic>> topicFuture = topics.computeIfAbsent(topicName.toString(),
1043+
(tpName) -> {
10431044
if (topicName.isPartitioned()) {
10441045
final TopicName topicNameEntity = TopicName.get(topicName.getPartitionedTopicName());
10451046
return fetchPartitionedTopicMetadataAsync(topicNameEntity)
@@ -1054,23 +1055,26 @@ public CompletableFuture<Optional<Topic>> getTopic(final TopicName topicName, bo
10541055
final String errorMsg =
10551056
String.format("Illegal topic partition name %s with max allowed "
10561057
+ "%d partitions", topicName, metadata.partitions);
1057-
pulsar.getExecutor().execute(() -> topics.remove(topicName.toString()));
10581058
log.warn(errorMsg);
10591059
return FutureUtil.failedFuture(
10601060
new BrokerServiceException.NotAllowedException(errorMsg));
10611061
});
10621062
}
10631063
return loadOrCreatePersistentTopic(tpName, createIfMissing, properties, topicPolicies);
1064-
}).thenCompose(optionalTopic -> {
1065-
if (!optionalTopic.isPresent() && createIfMissing) {
1066-
log.warn("[{}] Try to recreate the topic with createIfMissing=true "
1067-
+ "but the returned topic is empty", topicName);
1068-
// Before retry create topic, need remove it from topics.
1069-
topics.remove(topicName.toString());
1070-
return getTopic(topicName, createIfMissing, properties);
1064+
});
1065+
topicFuture.whenComplete((__, ex) -> {
1066+
if (ex != null) {
1067+
// Remove the topic future from the map if the topic creation fails.
1068+
// If the failure is due to a timeout, do not remove it immediately. Instead
1069+
// wait for the original thread that initiated the topic creation to remove it,
1070+
// as it is essential to ensure that all related resources are closed before removal.
1071+
if (!ex.equals(FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION)) {
1072+
pulsar.getExecutor().execute(() ->
1073+
topics.remove(topicName.toString(), topicFuture));
1074+
}
10711075
}
1072-
return CompletableFuture.completedFuture(optionalTopic);
10731076
});
1077+
return topicFuture;
10741078
});
10751079
});
10761080
} else {
@@ -1081,7 +1085,8 @@ public CompletableFuture<Optional<Topic>> getTopic(final TopicName topicName, bo
10811085
final TopicName partitionedTopicName = TopicName.get(topicName.getPartitionedTopicName());
10821086
return this.fetchPartitionedTopicMetadataAsync(partitionedTopicName).thenCompose((metadata) -> {
10831087
if (topicName.getPartitionIndex() < metadata.partitions) {
1084-
return topics.computeIfAbsent(topicName.toString(), (name) -> {
1088+
CompletableFuture<Optional<Topic>> topicFuture = topics.computeIfAbsent(
1089+
topicName.toString(), (name) -> {
10851090
topicEventsDispatcher
10861091
.notify(topicName.toString(), TopicEvent.CREATE, EventStage.BEFORE);
10871092

@@ -1093,12 +1098,20 @@ public CompletableFuture<Optional<Topic>> getTopic(final TopicName topicName, bo
10931098
.notifyOnCompletion(eventFuture, topicName.toString(), TopicEvent.LOAD);
10941099
return res;
10951100
});
1101+
topicFuture.whenComplete((__, ex) -> {
1102+
if (ex != null) {
1103+
pulsar.getExecutor().execute(() ->
1104+
topics.remove(topicName.toString(), topicFuture));
1105+
}
1106+
});
1107+
return topicFuture;
10961108
}
10971109
topicEventsDispatcher.notify(topicName.toString(), TopicEvent.LOAD, EventStage.FAILURE);
10981110
return CompletableFuture.completedFuture(Optional.empty());
10991111
});
11001112
} else if (createIfMissing) {
1101-
return topics.computeIfAbsent(topicName.toString(), (name) -> {
1113+
CompletableFuture<Optional<Topic>> topicFuture = topics.computeIfAbsent(topicName.toString(),
1114+
(name) -> {
11021115
topicEventsDispatcher.notify(topicName.toString(), TopicEvent.CREATE, EventStage.BEFORE);
11031116

11041117
CompletableFuture<Optional<Topic>> res = createNonPersistentTopic(name);
@@ -1109,13 +1122,20 @@ public CompletableFuture<Optional<Topic>> getTopic(final TopicName topicName, bo
11091122
.notifyOnCompletion(eventFuture, topicName.toString(), TopicEvent.LOAD);
11101123
return res;
11111124
});
1125+
topicFuture.whenComplete((__, ex) -> {
1126+
if (ex != null) {
1127+
pulsar.getExecutor().execute(() ->
1128+
topics.remove(topicName.toString(), topicFuture));
1129+
}
1130+
});
1131+
return topicFuture;
11121132
} else {
1113-
CompletableFuture<Optional<Topic>> topicFuture = topics.get(topicName.toString());
1114-
if (topicFuture == null) {
1133+
CompletableFuture<Optional<Topic>> res = topics.get(topicName.toString());
1134+
if (res == null) {
11151135
topicEventsDispatcher.notify(topicName.toString(), TopicEvent.LOAD, EventStage.FAILURE);
1116-
topicFuture = CompletableFuture.completedFuture(Optional.empty());
1136+
res = CompletableFuture.completedFuture(Optional.empty());
11171137
}
1118-
return topicFuture;
1138+
return res;
11191139
}
11201140
}
11211141
} catch (IllegalArgumentException e) {
@@ -1261,7 +1281,6 @@ private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic
12611281
if (log.isDebugEnabled()) {
12621282
log.debug("Broker is unable to load non-persistent topic {}", topic);
12631283
}
1264-
pulsar.getExecutor().execute(() -> topics.remove(topic));
12651284
return FutureUtil.failedFuture(
12661285
new NotAllowedException("Broker is not unable to load non-persistent topic"));
12671286
}
@@ -1271,7 +1290,6 @@ private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic
12711290
nonPersistentTopic = newTopic(topic, null, this, NonPersistentTopic.class);
12721291
} catch (Throwable e) {
12731292
log.warn("Failed to create topic {}", topic, e);
1274-
pulsar.getExecutor().execute(() -> topics.remove(topic));
12751293
topicFuture.completeExceptionally(e);
12761294
return topicFuture;
12771295
}
@@ -1288,7 +1306,6 @@ private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic
12881306
}).exceptionally(ex -> {
12891307
log.warn("Replication check failed. Removing topic from topics list {}, {}", topic, ex.getCause());
12901308
nonPersistentTopic.stopReplProducers().whenComplete((v, exception) -> {
1291-
pulsar.getExecutor().execute(() -> topics.remove(topic));
12921309
topicFuture.completeExceptionally(ex);
12931310
});
12941311
return null;
@@ -1304,7 +1321,7 @@ private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic
13041321
topicFuture.complete(Optional.of(nonPersistentTopic));
13051322
// after get metadata return success, we should delete this topic from this broker, because this topic not
13061323
// owner by this broker and it don't initialize and checkReplication
1307-
pulsar.getExecutor().execute(() -> topics.remove(topic));
1324+
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
13081325
return null;
13091326
});
13101327

@@ -1543,7 +1560,6 @@ protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(final S
15431560
if (log.isDebugEnabled()) {
15441561
log.debug("Broker is unable to load persistent topic {}", topic);
15451562
}
1546-
pulsar.getExecutor().execute(() -> topics.remove(topic));
15471563
topicFuture.completeExceptionally(new NotAllowedException(
15481564
"Broker is unable to load persistent topic"));
15491565
return topicFuture;
@@ -1562,7 +1578,6 @@ protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(final S
15621578
// do not recreate topic if topic is already migrated and deleted by broker
15631579
// so, avoid creating a new topic if migration is already started
15641580
if (ex != null && (ex.getCause() instanceof TopicMigratedException)) {
1565-
pulsar.getExecutor().execute(() -> topics.remove(topic));
15661581
topicFuture.completeExceptionally(ex.getCause());
15671582
return null;
15681583
}
@@ -1577,7 +1592,6 @@ protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(final S
15771592
}
15781593
}
15791594
}).exceptionally(ex -> {
1580-
pulsar.getExecutor().execute(() -> topics.remove(topic));
15811595
topicFuture.completeExceptionally(ex.getCause());
15821596
return null;
15831597
});
@@ -1631,15 +1645,13 @@ private void checkOwnershipAndCreatePersistentTopic(final String topic, boolean
16311645
finalProperties, topicPolicies)
16321646
).exceptionally(throwable -> {
16331647
log.warn("[{}] Read topic property failed", topic, throwable);
1634-
pulsar.getExecutor().execute(() -> topics.remove(topic));
16351648
topicFuture.completeExceptionally(throwable);
16361649
return null;
16371650
});
16381651
} else {
16391652
// namespace is being unloaded
16401653
String msg = String.format("Namespace is being unloaded, cannot add topic %s", topic);
16411654
log.warn(msg);
1642-
pulsar.getExecutor().execute(() -> topics.remove(topic));
16431655
topicFuture.completeExceptionally(new ServiceUnitNotReadyException(msg));
16441656
}
16451657
}).exceptionally(ex -> {
@@ -1670,7 +1682,6 @@ private void createPersistentTopic(final String topic, boolean createIfMissing,
16701682
if (isTransactionInternalName(topicName)) {
16711683
String msg = String.format("Can not create transaction system topic %s", topic);
16721684
log.warn(msg);
1673-
pulsar.getExecutor().execute(() -> topics.remove(topic));
16741685
topicFuture.completeExceptionally(new NotAllowedException(msg));
16751686
return;
16761687
}
@@ -1752,7 +1763,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
17521763
+ " topic", topic, FutureUtil.getException(topicFuture));
17531764
executor().submit(() -> {
17541765
persistentTopic.close().whenComplete((ignore, ex) -> {
1755-
topics.remove(topic);
1766+
topics.remove(topic, topicFuture);
17561767
if (ex != null) {
17571768
log.warn("[{}] Get an error when closing topic.",
17581769
topic, ex);
@@ -1769,7 +1780,6 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
17691780
+ " Removing topic from topics list {}, {}", topic, ex);
17701781
executor().submit(() -> {
17711782
persistentTopic.close().whenComplete((ignore, closeEx) -> {
1772-
topics.remove(topic);
17731783
if (closeEx != null) {
17741784
log.warn("[{}] Get an error when closing topic.",
17751785
topic, closeEx);
@@ -1781,7 +1791,6 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
17811791
});
17821792
} catch (PulsarServerException e) {
17831793
log.warn("Failed to create topic {}: {}", topic, e.getMessage());
1784-
pulsar.getExecutor().execute(() -> topics.remove(topic));
17851794
topicFuture.completeExceptionally(e);
17861795
}
17871796
}
@@ -1794,7 +1803,6 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
17941803
topicFuture.complete(Optional.empty());
17951804
} else {
17961805
log.warn("Failed to create topic {}", topic, exception);
1797-
pulsar.getExecutor().execute(() -> topics.remove(topic));
17981806
topicFuture.completeExceptionally(new PersistenceException(exception));
17991807
}
18001808
}
@@ -1804,7 +1812,6 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
18041812
log.warn("[{}] Failed to get topic configuration: {}", topic, exception.getMessage(), exception);
18051813
// remove topic from topics-map in different thread to avoid possible deadlock if
18061814
// createPersistentTopic-thread only tries to handle this future-result
1807-
pulsar.getExecutor().execute(() -> topics.remove(topic));
18081815
topicFuture.completeExceptionally(exception);
18091816
return null;
18101817
});

0 commit comments

Comments
 (0)