public CompletableFuture close()

in pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java [1662:1826]


    public CompletableFuture<Void> close(
            boolean disconnectClients, boolean closeWithoutWaitingClientDisconnect) {
        lock.writeLock().lock();
        // Choose the close type.
        CloseTypes closeType;
        if (!disconnectClients) {
            closeType = CloseTypes.transferring;
        } else if (closeWithoutWaitingClientDisconnect) {
            closeType = CloseTypes.notWaitDisconnectClients;
        } else {
            // closing managed-ledger waits until all producers/consumers/replicators get closed. Sometimes, broker
            // forcefully wants to close managed-ledger without waiting all resources to be closed.
            closeType = CloseTypes.waitDisconnectClients;
        }
        /** Maybe there is a in-progress half closing task. see the section 2-b-1 of {@link CloseFutures}. **/
        CompletableFuture<Void> inProgressTransferCloseTask = null;
        try {
            // Return in-progress future if exists.
            if (isClosingOrDeleting) {
                if (closeType == CloseTypes.transferring) {
                    return closeFutures.transferring;
                }
                if (closeType == CloseTypes.notWaitDisconnectClients && closeFutures.notWaitDisconnectClients != null) {
                    return closeFutures.notWaitDisconnectClients;
                }
                if (closeType == CloseTypes.waitDisconnectClients && closeFutures.waitDisconnectClients != null) {
                    return closeFutures.waitDisconnectClients;
                }
                if (transferring) {
                    inProgressTransferCloseTask = closeFutures.transferring;
                }
            }
            fenceTopicToCloseOrDelete();
            if (closeType == CloseTypes.transferring) {
                transferring = true;
                this.closeFutures = new CloseFutures(new CompletableFuture(), null, null);
            } else {
                this.closeFutures =
                        new CloseFutures(new CompletableFuture(), new CompletableFuture(), new CompletableFuture());
            }
        } finally {
            lock.writeLock().unlock();
        }

        List<CompletableFuture<Void>> futures = new ArrayList<>();
        if (inProgressTransferCloseTask != null) {
            futures.add(inProgressTransferCloseTask);
        }

        futures.add(transactionBuffer.closeAsync());
        replicators.forEach((cluster, replicator) -> futures.add(replicator.terminate()));
        shadowReplicators.forEach((__, replicator) -> futures.add(replicator.terminate()));
        if (closeType != CloseTypes.transferring) {
            futures.add(ExtensibleLoadManagerImpl.getAssignedBrokerLookupData(
                brokerService.getPulsar(), topic).thenAccept(lookupData -> {
                    producers.values().forEach(producer -> futures.add(producer.disconnect(lookupData)));
                    // Topics unloaded due to the ExtensibleLoadManager undergo closing twice: first with
                    // disconnectClients = false, second with disconnectClients = true. The check below identifies the
                    // cases when Topic.close is called outside the scope of the ExtensibleLoadManager. In these
                    // situations, we must pursue the regular Subscription.close, as Topic.close is invoked just once.
                    if (isTransferring()) {
                        subscriptions.forEach((s, sub) -> futures.add(sub.disconnect(lookupData)));
                    } else {
                        subscriptions.forEach((s, sub) -> futures.add(sub.close(true, lookupData)));
                    }
                }
            ));
        } else {
            subscriptions.forEach((s, sub) -> futures.add(sub.close(false, Optional.empty())));
        }

        //close entry filters
        if (entryFilters != null) {
            entryFilters.getRight().forEach((filter) -> {
                try {
                    filter.close();
                } catch (Throwable e) {
                    log.warn("Error shutting down entry filter {}", filter, e);
                }
            });
        }

        if (topicCompactionService != null) {
            try {
                topicCompactionService.close();
            } catch (Exception e) {
                log.warn("Error close topicCompactionService ", e);
            }
        }

        CompletableFuture<Void> disconnectClientsInCurrentCall = null;
        // Note: "disconnectClientsToCache" is a non-able value, it is null when close type is transferring.
        AtomicReference<CompletableFuture<Void>> disconnectClientsToCache = new AtomicReference<>();
        switch (closeType) {
            case transferring -> {
                disconnectClientsInCurrentCall = FutureUtil.waitForAll(futures);
                break;
            }
            case notWaitDisconnectClients -> {
                disconnectClientsInCurrentCall = CompletableFuture.completedFuture(null);
                disconnectClientsToCache.set(FutureUtil.waitForAll(futures));
                break;
            }
            case waitDisconnectClients -> {
                disconnectClientsInCurrentCall = FutureUtil.waitForAll(futures);
                disconnectClientsToCache.set(disconnectClientsInCurrentCall);
            }
        }

        CompletableFuture<Void> closeFuture = new CompletableFuture<>();
        Runnable closeLedgerAfterCloseClients = (() -> ledger.asyncClose(new CloseCallback() {
            @Override
            public void closeComplete(Object ctx) {
                if (closeType != CloseTypes.transferring) {
                    // Everything is now closed, remove the topic from map
                    disposeTopic(closeFuture);
                } else {
                    closeFuture.complete(null);
                }
            }

            @Override
            public void closeFailed(ManagedLedgerException exception, Object ctx) {
                log.error("[{}] Failed to close managed ledger, proceeding anyway.", topic, exception);
                if (closeType != CloseTypes.transferring) {
                    disposeTopic(closeFuture);
                } else {
                    closeFuture.complete(null);
                }
            }
        }, null));

        disconnectClientsInCurrentCall.thenRun(closeLedgerAfterCloseClients).exceptionally(exception -> {
            log.error("[{}] Error closing topic", topic, exception);
            unfenceTopicToResume();
            closeFuture.completeExceptionally(exception);
            return null;
        });

        switch (closeType) {
            case transferring -> {
                FutureUtil.completeAfterAll(closeFutures.transferring, closeFuture);
                break;
            }
            case notWaitDisconnectClients -> {
                FutureUtil.completeAfterAll(closeFutures.transferring, closeFuture);
                FutureUtil.completeAfter(closeFutures.notWaitDisconnectClients, closeFuture);
                FutureUtil.completeAfterAll(closeFutures.waitDisconnectClients,
                        closeFuture.thenCompose(ignore -> disconnectClientsToCache.get().exceptionally(ex -> {
                            // Since the managed ledger has been closed, eat the error of clients disconnection.
                            log.error("[{}] Closed managed ledger, but disconnect clients failed,"
                                    + " this topic will be marked closed", topic, ex);
                            return null;
                        })));
                break;
            }
            case waitDisconnectClients -> {
                FutureUtil.completeAfterAll(closeFutures.transferring, closeFuture);
                FutureUtil.completeAfter(closeFutures.notWaitDisconnectClients, closeFuture);
                FutureUtil.completeAfterAll(closeFutures.waitDisconnectClients, closeFuture);
            }
        }

        return closeFuture;
    }