public CompletableFuture closeAsync()

in pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java [478:740]


    public CompletableFuture<Void> closeAsync() {
        mutex.lock();
        try {
            // Close protocol handler before unloading namespace bundles because protocol handlers might maintain
            // Pulsar clients that could send lookup requests that affect unloading.
            if (protocolHandlers != null) {
                protocolHandlers.close();
                protocolHandlers = null;
            }
            if (closeFuture != null) {
                return closeFuture;
            }
            LOG.info("Closing PulsarService");
            if (topicPoliciesService != null) {
                topicPoliciesService.close();
            }
            if (brokerService != null) {
                brokerService.unloadNamespaceBundlesGracefully();
            }
            // It only tells the Pulsar clients that this service is not ready to serve for the lookup requests
            state = State.Closing;

            if (healthChecker != null) {
                healthChecker.close();
                healthChecker = null;
            }

            // close the service in reverse order v.s. in which they are started
            if (this.resourceUsageTransportManager != null) {
                try {
                    this.resourceUsageTransportManager.close();
                } catch (Exception e) {
                    LOG.warn("ResourceUsageTransportManager closing failed {}", e.getMessage());
                }
                this.resourceUsageTransportManager = null;
            }
            if (this.resourceGroupServiceManager != null) {
                try {
                    this.resourceGroupServiceManager.close();
                } catch (Exception e) {
                    LOG.warn("ResourceGroupServiceManager closing failed {}", e.getMessage());
                }
                this.resourceGroupServiceManager = null;
            }

            if (this.webService != null) {
                try {
                    this.webService.close();
                    this.webService = null;
                } catch (Exception e) {
                    LOG.error("Web service closing failed", e);
                    // Even if the web service fails to close, the graceful shutdown process continues
                }
            }

            resetMetricsServlet();
            if (openTelemetry != null) {
                openTelemetry.close();
            }

            if (this.compactionServiceFactory != null) {
                try {
                    this.compactionServiceFactory.close();
                } catch (Exception e) {
                    LOG.warn("CompactionServiceFactory closing failed {}", e.getMessage());
                }
                this.compactionServiceFactory = null;
            }

            if (this.webSocketService != null) {
                this.webSocketService.close();
            }

            if (brokerAdditionalServlets != null) {
                brokerAdditionalServlets.close();
                brokerAdditionalServlets = null;
            }

            GracefulExecutorServicesShutdown executorServicesShutdown =
                    GracefulExecutorServicesShutdown
                            .initiate()
                            .timeout(
                                    Duration.ofMillis(
                                            (long) (GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT
                                                    * getConfiguration()
                                                    .getBrokerShutdownTimeoutMs())));

            // cancel loadShedding task and shutdown the loadManager executor before shutting down the broker
            cancelLoadBalancerTasks();
            executorServicesShutdown.shutdown(loadManagerExecutor);

            List<CompletableFuture<Void>> asyncCloseFutures = new ArrayList<>();
            if (this.brokerService != null) {
                CompletableFuture<Void> brokerCloseFuture = this.brokerService.closeAsync();
                if (this.transactionMetadataStoreService != null) {
                    asyncCloseFutures.add(brokerCloseFuture.whenComplete((__, ___) -> {
                        // close transactionMetadataStoreService after the broker has been closed
                        this.transactionMetadataStoreService.close();
                        this.transactionMetadataStoreService = null;
                    }));
                } else {
                    asyncCloseFutures.add(brokerCloseFuture);
                }
                this.brokerService = null;
            }

            if (this.managedLedgerStorage != null) {
                try {
                    this.managedLedgerStorage.close();
                } catch (Exception e) {
                    LOG.warn("ManagedLedgerClientFactory closing failed {}", e.getMessage());
                }
                this.managedLedgerStorage = null;
            }

            if (bkClientFactory != null) {
                this.bkClientFactory.close();
                this.bkClientFactory = null;
            }

            closeLeaderElectionService();

            if (adminClient != null) {
                adminClient.close();
                adminClient = null;
            }

            if (transactionBufferSnapshotServiceFactory != null) {
                transactionBufferSnapshotServiceFactory.close();
                transactionBufferSnapshotServiceFactory = null;
            }

            if (transactionBufferClient != null) {
                transactionBufferClient.close();
            }


            if (client != null) {
                client.close();
                client = null;
            }

            if (nsService != null) {
                nsService.close();
                nsService = null;
            }

            executorServicesShutdown.shutdown(compactorExecutor);
            executorServicesShutdown.shutdown(offloaderScheduler);
            executorServicesShutdown.shutdown(offloaderReadExecutor);
            executorServicesShutdown.shutdown(executor);
            executorServicesShutdown.shutdown(orderedExecutor);

            LoadManager loadManager = this.loadManager.get();
            if (loadManager != null) {
                loadManager.stop();
            }

            if (schemaRegistryService != null) {
                schemaRegistryService.close();
            }

            offloadersCache.close();

            if (coordinationService != null) {
                try {
                    coordinationService.close();
                } catch (Exception e) {
                    Throwable cause = FutureUtil.unwrapCompletionException(e);
                    if (!(cause instanceof MetadataStoreException.AlreadyClosedException)) {
                        throw e;
                    }
                }
            }

            asyncCloseFutures.add(closeLocalMetadataStore());
            if (configMetadataSynchronizer != null) {
                asyncCloseFutures.add(configMetadataSynchronizer.closeAsync());
            }
            if (configurationMetadataStore != null && shouldShutdownConfigurationMetadataStore) {
                configurationMetadataStore.close();
            }

            if (transactionExecutorProvider != null) {
                transactionExecutorProvider.shutdownNow();
            }
            MLPendingAckStoreProvider.closeBufferedWriterMetrics();
            MLTransactionMetadataStoreProvider.closeBufferedWriterMetrics();
            if (this.offloaderStats != null) {
                this.offloaderStats.close();
            }

            brokerClientSharedExternalExecutorProvider.shutdownNow();
            brokerClientSharedInternalExecutorProvider.shutdownNow();
            brokerClientSharedScheduledExecutorProvider.shutdownNow();
            brokerClientSharedLookupExecutorProvider.shutdownNow();
            brokerClientSharedTimer.stop();
            if (monotonicClock instanceof AutoCloseable c) {
                c.close();
            }

            if (openTelemetryTransactionPendingAckStoreStats != null) {
                openTelemetryTransactionPendingAckStoreStats.close();
                openTelemetryTransactionPendingAckStoreStats = null;
            }
            if (openTelemetryTransactionCoordinatorStats != null) {
                openTelemetryTransactionCoordinatorStats.close();
                openTelemetryTransactionCoordinatorStats = null;
            }
            if (openTelemetryReplicatorStats != null) {
                openTelemetryReplicatorStats.close();
                openTelemetryReplicatorStats = null;
            }
            if (openTelemetryProducerStats != null) {
                openTelemetryProducerStats.close();
                openTelemetryProducerStats = null;
            }
            if (openTelemetryConsumerStats != null) {
                openTelemetryConsumerStats.close();
                openTelemetryConsumerStats = null;
            }
            if (openTelemetryTopicStats != null) {
                openTelemetryTopicStats.close();
                openTelemetryTopicStats = null;
            }

            asyncCloseFutures.add(EventLoopUtil.shutdownGracefully(ioEventLoopGroup));


            // add timeout handling for closing executors
            asyncCloseFutures.add(executorServicesShutdown.handle());

            closeFuture = addTimeoutHandling(FutureUtil.waitForAllAndSupportCancel(asyncCloseFutures));
            closeFuture.handle((v, t) -> {
                if (t == null) {
                    LOG.info("Closed");
                } else if (t instanceof CancellationException) {
                    LOG.info("Closed (shutdown cancelled)");
                } else if (t instanceof TimeoutException) {
                    LOG.info("Closed (shutdown timeout)");
                } else {
                    LOG.warn("Closed with errors", t);
                }
                state = State.Closed;
                isClosedCondition.signalAll();
                return null;
            });
            return closeFuture;
        } catch (Exception e) {
            PulsarServerException pse;
            if (e instanceof CompletionException && e.getCause() instanceof MetadataStoreException) {
                pse = new PulsarServerException(MetadataStoreException.unwrap(e));
            } else if (e.getCause() instanceof CompletionException
                    && e.getCause().getCause() instanceof MetadataStoreException) {
                pse = new PulsarServerException(MetadataStoreException.unwrap(e.getCause()));
            } else {
                pse = new PulsarServerException(e);
            }
            return FutureUtil.failedFuture(pse);
        } finally {
            mutex.unlock();
        }
    }