private void internalRetryableDeleteNamespaceAsync0()

in pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java [219:357]


    private void internalRetryableDeleteNamespaceAsync0(boolean force, int retryTimes,
                                                        @NonNull CompletableFuture<Void> callback) {
        precheckWhenDeleteNamespace(namespaceName, force)
                .thenCompose(policies -> {
                    final CompletableFuture<List<String>> topicsFuture;
                    if (policies == null || CollectionUtils.isEmpty(policies.replication_clusters)){
                        topicsFuture = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName);
                    } else {
                        topicsFuture = pulsar().getNamespaceService().getFullListOfTopics(namespaceName);
                    }
                    return topicsFuture.thenCompose(allTopics ->
                            pulsar().getNamespaceService().getFullListOfPartitionedTopic(namespaceName)
                                    .thenCompose(allPartitionedTopics -> {
                                        List<List<String>> topicsSum = new ArrayList<>(2);
                                        topicsSum.add(allTopics);
                                        topicsSum.add(allPartitionedTopics);
                                        return CompletableFuture.completedFuture(topicsSum);
                                    }))
                            .thenCompose(topics -> {
                                List<String> allTopics = topics.get(0);
                                Set<String> allUserCreatedTopics = new HashSet<>();
                                List<String> allPartitionedTopics = topics.get(1);
                                Set<String> allUserCreatedPartitionTopics = new HashSet<>();
                                boolean hasNonSystemTopic = false;
                                Set<String> allSystemTopics = new HashSet<>();
                                Set<String> allPartitionedSystemTopics = new HashSet<>();
                                Set<String> topicPolicy = new HashSet<>();
                                Set<String> partitionedTopicPolicy = new HashSet<>();
                                for (String topic : allTopics) {
                                    if (!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic))) {
                                        hasNonSystemTopic = true;
                                        allUserCreatedTopics.add(topic);
                                    } else {
                                        if (SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
                                            topicPolicy.add(topic);
                                        } else if (!isDeletedAlongWithUserCreatedTopic(topic)) {
                                            allSystemTopics.add(topic);
                                        }
                                    }
                                }
                                for (String topic : allPartitionedTopics) {
                                    if (!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic))) {
                                        hasNonSystemTopic = true;
                                        allUserCreatedPartitionTopics.add(topic);
                                    } else {
                                        if (SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
                                            partitionedTopicPolicy.add(topic);
                                        } else {
                                            allPartitionedSystemTopics.add(topic);
                                        }
                                    }
                                }
                                if (!force) {
                                    if (hasNonSystemTopic) {
                                        throw new RestException(Status.CONFLICT, "Cannot delete non empty namespace");
                                    }
                                }
                                final CompletableFuture<Void> markDeleteFuture;
                                if (policies != null && policies.deleted) {
                                    markDeleteFuture = CompletableFuture.completedFuture(null);
                                } else {
                                    markDeleteFuture = namespaceResources().setPoliciesAsync(namespaceName, old -> {
                                        old.deleted = true;
                                        return old;
                                    });
                                }
                                return markDeleteFuture.thenCompose(__ ->
                                                internalDeleteTopicsAsync(allUserCreatedTopics))
                                        .thenCompose(ignore ->
                                                internalDeletePartitionedTopicsAsync(allUserCreatedPartitionTopics))
                                        .thenCompose(ignore ->
                                                internalDeleteTopicsAsync(allSystemTopics))
                                        .thenCompose(ignore ->
                                                internalDeletePartitionedTopicsAsync(allPartitionedSystemTopics))
                                        .thenCompose(ignore ->
                                                internalDeleteTopicsAsync(topicPolicy))
                                        .thenCompose(ignore ->
                                                internalDeletePartitionedTopicsAsync(partitionedTopicPolicy));
                            });
                })
                .thenCompose(ignore -> pulsar().getNamespaceService()
                        .getNamespaceBundleFactory().getBundlesAsync(namespaceName))
                .thenCompose(bundles -> FutureUtil.waitForAll(bundles.getBundles().stream()
                        .map(bundle -> pulsar().getNamespaceService().checkOwnershipPresentAsync(bundle)
                                .thenCompose(present -> {
                                    // check if the bundle is owned by any broker,
                                    // if not then we do not need to delete the bundle
                                    if (present) {
                                        PulsarAdmin admin;
                                        try {
                                            admin = pulsar().getAdminClient();
                                        } catch (PulsarServerException ex) {
                                            log.error("[{}] Get admin client error when preparing to delete topics.",
                                                    clientAppId(), ex);
                                            return FutureUtil.failedFuture(ex);
                                        }
                                        log.info("[{}] Deleting namespace bundle {}/{}", clientAppId(),
                                                namespaceName, bundle.getBundleRange());
                                        return admin.namespaces().deleteNamespaceBundleAsync(namespaceName.toString(),
                                                bundle.getBundleRange(), force);
                                    } else {
                                        log.warn("[{}] Skipping deleting namespace bundle {}/{} "
                                                        + "as it's not owned by any broker",
                                                clientAppId(), namespaceName, bundle.getBundleRange());
                                    }
                                    return CompletableFuture.completedFuture(null);
                                })
                        ).collect(Collectors.toList())))
                .thenCompose(ignore -> internalClearZkSources())
                .whenComplete((result, error) -> {
                    if (error != null) {
                        final Throwable rc = FutureUtil.unwrapCompletionException(error);
                        if (rc instanceof MetadataStoreException) {
                            if (rc.getCause() != null && rc.getCause() instanceof KeeperException.NotEmptyException) {
                                KeeperException.NotEmptyException ne =
                                        (KeeperException.NotEmptyException) rc.getCause();
                                log.info("[{}] There are in-flight topics created during the namespace deletion, "
                                        + "retry to delete the namespace again. (path {} is not empty on metadata)",
                                        namespaceName, ne.getPath());
                                final int next = retryTimes - 1;
                                if (next > 0) {
                                    // async recursive
                                    internalRetryableDeleteNamespaceAsync0(force, next, callback);
                                } else {
                                    callback.completeExceptionally(
                                            new RestException(Status.CONFLICT, "The broker still have in-flight topics"
                                                    + " created during namespace deletion (path " + ne.getPath() + ") "
                                                    + "is not empty on metadata store, please try again."));
                                    // drop out recursive
                                }
                                return;
                            }
                        }
                        callback.completeExceptionally(error);
                        return;
                    }
                    callback.complete(result);
                });
    }