in pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java [219:357]
private void internalRetryableDeleteNamespaceAsync0(boolean force, int retryTimes,
@NonNull CompletableFuture<Void> callback) {
precheckWhenDeleteNamespace(namespaceName, force)
.thenCompose(policies -> {
final CompletableFuture<List<String>> topicsFuture;
if (policies == null || CollectionUtils.isEmpty(policies.replication_clusters)){
topicsFuture = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName);
} else {
topicsFuture = pulsar().getNamespaceService().getFullListOfTopics(namespaceName);
}
return topicsFuture.thenCompose(allTopics ->
pulsar().getNamespaceService().getFullListOfPartitionedTopic(namespaceName)
.thenCompose(allPartitionedTopics -> {
List<List<String>> topicsSum = new ArrayList<>(2);
topicsSum.add(allTopics);
topicsSum.add(allPartitionedTopics);
return CompletableFuture.completedFuture(topicsSum);
}))
.thenCompose(topics -> {
List<String> allTopics = topics.get(0);
Set<String> allUserCreatedTopics = new HashSet<>();
List<String> allPartitionedTopics = topics.get(1);
Set<String> allUserCreatedPartitionTopics = new HashSet<>();
boolean hasNonSystemTopic = false;
Set<String> allSystemTopics = new HashSet<>();
Set<String> allPartitionedSystemTopics = new HashSet<>();
Set<String> topicPolicy = new HashSet<>();
Set<String> partitionedTopicPolicy = new HashSet<>();
for (String topic : allTopics) {
if (!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic))) {
hasNonSystemTopic = true;
allUserCreatedTopics.add(topic);
} else {
if (SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
topicPolicy.add(topic);
} else if (!isDeletedAlongWithUserCreatedTopic(topic)) {
allSystemTopics.add(topic);
}
}
}
for (String topic : allPartitionedTopics) {
if (!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic))) {
hasNonSystemTopic = true;
allUserCreatedPartitionTopics.add(topic);
} else {
if (SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
partitionedTopicPolicy.add(topic);
} else {
allPartitionedSystemTopics.add(topic);
}
}
}
if (!force) {
if (hasNonSystemTopic) {
throw new RestException(Status.CONFLICT, "Cannot delete non empty namespace");
}
}
final CompletableFuture<Void> markDeleteFuture;
if (policies != null && policies.deleted) {
markDeleteFuture = CompletableFuture.completedFuture(null);
} else {
markDeleteFuture = namespaceResources().setPoliciesAsync(namespaceName, old -> {
old.deleted = true;
return old;
});
}
return markDeleteFuture.thenCompose(__ ->
internalDeleteTopicsAsync(allUserCreatedTopics))
.thenCompose(ignore ->
internalDeletePartitionedTopicsAsync(allUserCreatedPartitionTopics))
.thenCompose(ignore ->
internalDeleteTopicsAsync(allSystemTopics))
.thenCompose(ignore ->
internalDeletePartitionedTopicsAsync(allPartitionedSystemTopics))
.thenCompose(ignore ->
internalDeleteTopicsAsync(topicPolicy))
.thenCompose(ignore ->
internalDeletePartitionedTopicsAsync(partitionedTopicPolicy));
});
})
.thenCompose(ignore -> pulsar().getNamespaceService()
.getNamespaceBundleFactory().getBundlesAsync(namespaceName))
.thenCompose(bundles -> FutureUtil.waitForAll(bundles.getBundles().stream()
.map(bundle -> pulsar().getNamespaceService().checkOwnershipPresentAsync(bundle)
.thenCompose(present -> {
// check if the bundle is owned by any broker,
// if not then we do not need to delete the bundle
if (present) {
PulsarAdmin admin;
try {
admin = pulsar().getAdminClient();
} catch (PulsarServerException ex) {
log.error("[{}] Get admin client error when preparing to delete topics.",
clientAppId(), ex);
return FutureUtil.failedFuture(ex);
}
log.info("[{}] Deleting namespace bundle {}/{}", clientAppId(),
namespaceName, bundle.getBundleRange());
return admin.namespaces().deleteNamespaceBundleAsync(namespaceName.toString(),
bundle.getBundleRange(), force);
} else {
log.warn("[{}] Skipping deleting namespace bundle {}/{} "
+ "as it's not owned by any broker",
clientAppId(), namespaceName, bundle.getBundleRange());
}
return CompletableFuture.completedFuture(null);
})
).collect(Collectors.toList())))
.thenCompose(ignore -> internalClearZkSources())
.whenComplete((result, error) -> {
if (error != null) {
final Throwable rc = FutureUtil.unwrapCompletionException(error);
if (rc instanceof MetadataStoreException) {
if (rc.getCause() != null && rc.getCause() instanceof KeeperException.NotEmptyException) {
KeeperException.NotEmptyException ne =
(KeeperException.NotEmptyException) rc.getCause();
log.info("[{}] There are in-flight topics created during the namespace deletion, "
+ "retry to delete the namespace again. (path {} is not empty on metadata)",
namespaceName, ne.getPath());
final int next = retryTimes - 1;
if (next > 0) {
// async recursive
internalRetryableDeleteNamespaceAsync0(force, next, callback);
} else {
callback.completeExceptionally(
new RestException(Status.CONFLICT, "The broker still have in-flight topics"
+ " created during namespace deletion (path " + ne.getPath() + ") "
+ "is not empty on metadata store, please try again."));
// drop out recursive
}
return;
}
}
callback.completeExceptionally(error);
return;
}
callback.complete(result);
});
}