private CompletableFuture delete()

in pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java [1474:1637]


    private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
                                           boolean failIfHasBacklogs,
                                           boolean closeIfClientsConnected) {

        lock.writeLock().lock();
        try {
            if (isClosingOrDeleting) {
                log.warn("[{}] Topic is already being closed or deleted", topic);
                return FutureUtil.failedFuture(new TopicFencedException("Topic is already fenced"));
            }
            // We can proceed with the deletion if either:
            //  1. No one is connected and no subscriptions
            //  2. The topic have subscriptions but no backlogs for all subscriptions
            //     if delete_when_no_subscriptions is applied
            //  3. We want to kick out everyone and forcefully delete the topic.
            //     In this case, we shouldn't care if the usageCount is 0 or not, just proceed
            if (!closeIfClientsConnected) {
                if (failIfHasSubscriptions && !subscriptions.isEmpty()) {
                    return FutureUtil.failedFuture(new TopicBusyException("Topic has subscriptions: "
                            + subscriptions.keySet().stream().toList()));
                } else if (failIfHasBacklogs) {
                    if (hasBacklogs(false)) {
                        List<String> backlogSubs =
                                subscriptions.values().stream()
                                        .filter(sub -> sub.getNumberOfEntriesInBacklog(false) > 0)
                                        .map(PersistentSubscription::getName).toList();
                        return FutureUtil.failedFuture(
                                new TopicBusyException("Topic has subscriptions did not catch up: " + backlogSubs));
                    } else if (!producers.isEmpty()) {
                        return FutureUtil.failedFuture(new TopicBusyException(
                                "Topic has " + producers.size() + " connected producers"));
                    }
                } else if (currentUsageCount() > 0) {
                    return FutureUtil.failedFuture(new TopicBusyException(
                            "Topic has " + currentUsageCount() + " connected producers/consumers"));
                }
            }

            fenceTopicToCloseOrDelete(); // Avoid clients reconnections while deleting
            // Mark the progress of close to prevent close calling concurrently.
            this.closeFutures =
                    new CloseFutures(new CompletableFuture(), new CompletableFuture(), new CompletableFuture());

            AtomicBoolean alreadyUnFenced = new AtomicBoolean();
            CompletableFuture<Void> res = getBrokerService().getPulsar().getPulsarResources().getNamespaceResources()
                        .getPartitionedTopicResources().runWithMarkDeleteAsync(TopicName.get(topic), () -> {
                CompletableFuture<Void> deleteFuture = new CompletableFuture<>();

                CompletableFuture<Void> closeClientFuture = new CompletableFuture<>();
                List<CompletableFuture<Void>> futures = new ArrayList<>();
                subscriptions.forEach((s, sub) -> futures.add(sub.close(true, Optional.empty())));
                if (closeIfClientsConnected) {
                    replicators.forEach((cluster, replicator) -> futures.add(replicator.terminate()));
                    shadowReplicators.forEach((__, replicator) -> futures.add(replicator.terminate()));
                    producers.values().forEach(producer -> futures.add(producer.disconnect()));
                }
                FutureUtil.waitForAll(futures).thenRunAsync(() -> {
                    closeClientFuture.complete(null);
                }, command -> {
                    try {
                        getOrderedExecutor().execute(command);
                    } catch (RejectedExecutionException e) {
                        // executor has been shut down, execute in current thread
                        command.run();
                    }
                }).exceptionally(ex -> {
                    log.error("[{}] Error closing clients", topic, ex);
                    alreadyUnFenced.set(true);
                    unfenceTopicToResume();
                    closeClientFuture.completeExceptionally(ex);
                    return null;
                });

                closeClientFuture.thenAccept(__ -> {
                    CompletableFuture<Void> deleteTopicAuthenticationFuture = new CompletableFuture<>();
                    brokerService.deleteTopicAuthenticationWithRetry(topic, deleteTopicAuthenticationFuture, 5);

                        deleteTopicAuthenticationFuture.thenCompose(ignore -> deleteSchema())
                                .thenCompose(ignore -> deleteTopicPolicies())
                                .thenCompose(ignore -> transactionBufferCleanupAndClose())
                                .whenComplete((v, ex) -> {
                                    if (ex != null) {
                                        log.error("[{}] Error deleting topic", topic, ex);
                                        alreadyUnFenced.set(true);
                                        unfenceTopicToResume();
                                        deleteFuture.completeExceptionally(ex);
                                    } else {
                                        List<CompletableFuture<Void>> subsDeleteFutures = new ArrayList<>();
                                        subscriptions.forEach((sub, p) -> subsDeleteFutures.add(unsubscribe(sub)));

                                    FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> {
                                        if (e != null) {
                                            log.error("[{}] Error deleting topic", topic, e);
                                            alreadyUnFenced.set(true);
                                            unfenceTopicToResume();
                                            deleteFuture.completeExceptionally(e);
                                        } else {
                                            ledger.asyncDelete(new AsyncCallbacks.DeleteLedgerCallback() {
                                                @Override
                                                public void deleteLedgerComplete(Object ctx) {
                                                    brokerService.removeTopicFromCache(PersistentTopic.this);

                                                    dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);

                                                    subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);

                                                    unregisterTopicPolicyListener();

                                                    log.info("[{}] Topic deleted", topic);
                                                    deleteFuture.complete(null);
                                                }

                                                @Override
                                                public void
                                                deleteLedgerFailed(ManagedLedgerException exception,
                                                                   Object ctx) {
                                                    if (exception.getCause()
                                                            instanceof MetadataStoreException.NotFoundException) {
                                                        log.info("[{}] Topic is already deleted {}",
                                                                topic, exception.getMessage());
                                                        deleteLedgerComplete(ctx);
                                                    } else {
                                                        log.error("[{}] Error deleting topic",
                                                                topic, exception);
                                                        alreadyUnFenced.set(true);
                                                        unfenceTopicToResume();
                                                        deleteFuture.completeExceptionally(
                                                                new PersistenceException(exception));
                                                    }
                                                }
                                            }, null);

                                        }
                                    });
                                }
                            });
                }).exceptionally(ex->{
                    alreadyUnFenced.set(true);
                    unfenceTopicToResume();
                    deleteFuture.completeExceptionally(
                            new TopicBusyException("Failed to close clients before deleting topic.",
                                    FutureUtil.unwrapCompletionException(ex)));
                    return null;
                });

                return deleteFuture;
                }).whenComplete((value, ex) -> {
                    if (ex != null) {
                        log.error("[{}] Error deleting topic", topic, ex);
                        if (!alreadyUnFenced.get()) {
                            unfenceTopicToResume();
                        }
                    }
                });

            FutureUtil.completeAfter(closeFutures.transferring, res);
            FutureUtil.completeAfter(closeFutures.notWaitDisconnectClients, res);
            FutureUtil.completeAfter(closeFutures.waitDisconnectClients, res);
            return res;
        } finally {
            lock.writeLock().unlock();
        }

    }