in pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java [478:740]
public CompletableFuture<Void> closeAsync() {
mutex.lock();
try {
// Close protocol handler before unloading namespace bundles because protocol handlers might maintain
// Pulsar clients that could send lookup requests that affect unloading.
if (protocolHandlers != null) {
protocolHandlers.close();
protocolHandlers = null;
}
if (closeFuture != null) {
return closeFuture;
}
LOG.info("Closing PulsarService");
if (topicPoliciesService != null) {
topicPoliciesService.close();
}
if (brokerService != null) {
brokerService.unloadNamespaceBundlesGracefully();
}
// It only tells the Pulsar clients that this service is not ready to serve for the lookup requests
state = State.Closing;
if (healthChecker != null) {
healthChecker.close();
healthChecker = null;
}
// close the service in reverse order v.s. in which they are started
if (this.resourceUsageTransportManager != null) {
try {
this.resourceUsageTransportManager.close();
} catch (Exception e) {
LOG.warn("ResourceUsageTransportManager closing failed {}", e.getMessage());
}
this.resourceUsageTransportManager = null;
}
if (this.resourceGroupServiceManager != null) {
try {
this.resourceGroupServiceManager.close();
} catch (Exception e) {
LOG.warn("ResourceGroupServiceManager closing failed {}", e.getMessage());
}
this.resourceGroupServiceManager = null;
}
if (this.webService != null) {
try {
this.webService.close();
this.webService = null;
} catch (Exception e) {
LOG.error("Web service closing failed", e);
// Even if the web service fails to close, the graceful shutdown process continues
}
}
resetMetricsServlet();
if (openTelemetry != null) {
openTelemetry.close();
}
if (this.compactionServiceFactory != null) {
try {
this.compactionServiceFactory.close();
} catch (Exception e) {
LOG.warn("CompactionServiceFactory closing failed {}", e.getMessage());
}
this.compactionServiceFactory = null;
}
if (this.webSocketService != null) {
this.webSocketService.close();
}
if (brokerAdditionalServlets != null) {
brokerAdditionalServlets.close();
brokerAdditionalServlets = null;
}
GracefulExecutorServicesShutdown executorServicesShutdown =
GracefulExecutorServicesShutdown
.initiate()
.timeout(
Duration.ofMillis(
(long) (GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT
* getConfiguration()
.getBrokerShutdownTimeoutMs())));
// cancel loadShedding task and shutdown the loadManager executor before shutting down the broker
cancelLoadBalancerTasks();
executorServicesShutdown.shutdown(loadManagerExecutor);
List<CompletableFuture<Void>> asyncCloseFutures = new ArrayList<>();
if (this.brokerService != null) {
CompletableFuture<Void> brokerCloseFuture = this.brokerService.closeAsync();
if (this.transactionMetadataStoreService != null) {
asyncCloseFutures.add(brokerCloseFuture.whenComplete((__, ___) -> {
// close transactionMetadataStoreService after the broker has been closed
this.transactionMetadataStoreService.close();
this.transactionMetadataStoreService = null;
}));
} else {
asyncCloseFutures.add(brokerCloseFuture);
}
this.brokerService = null;
}
if (this.managedLedgerStorage != null) {
try {
this.managedLedgerStorage.close();
} catch (Exception e) {
LOG.warn("ManagedLedgerClientFactory closing failed {}", e.getMessage());
}
this.managedLedgerStorage = null;
}
if (bkClientFactory != null) {
this.bkClientFactory.close();
this.bkClientFactory = null;
}
closeLeaderElectionService();
if (adminClient != null) {
adminClient.close();
adminClient = null;
}
if (transactionBufferSnapshotServiceFactory != null) {
transactionBufferSnapshotServiceFactory.close();
transactionBufferSnapshotServiceFactory = null;
}
if (transactionBufferClient != null) {
transactionBufferClient.close();
}
if (client != null) {
client.close();
client = null;
}
if (nsService != null) {
nsService.close();
nsService = null;
}
executorServicesShutdown.shutdown(compactorExecutor);
executorServicesShutdown.shutdown(offloaderScheduler);
executorServicesShutdown.shutdown(offloaderReadExecutor);
executorServicesShutdown.shutdown(executor);
executorServicesShutdown.shutdown(orderedExecutor);
LoadManager loadManager = this.loadManager.get();
if (loadManager != null) {
loadManager.stop();
}
if (schemaRegistryService != null) {
schemaRegistryService.close();
}
offloadersCache.close();
if (coordinationService != null) {
try {
coordinationService.close();
} catch (Exception e) {
Throwable cause = FutureUtil.unwrapCompletionException(e);
if (!(cause instanceof MetadataStoreException.AlreadyClosedException)) {
throw e;
}
}
}
asyncCloseFutures.add(closeLocalMetadataStore());
if (configMetadataSynchronizer != null) {
asyncCloseFutures.add(configMetadataSynchronizer.closeAsync());
}
if (configurationMetadataStore != null && shouldShutdownConfigurationMetadataStore) {
configurationMetadataStore.close();
}
if (transactionExecutorProvider != null) {
transactionExecutorProvider.shutdownNow();
}
MLPendingAckStoreProvider.closeBufferedWriterMetrics();
MLTransactionMetadataStoreProvider.closeBufferedWriterMetrics();
if (this.offloaderStats != null) {
this.offloaderStats.close();
}
brokerClientSharedExternalExecutorProvider.shutdownNow();
brokerClientSharedInternalExecutorProvider.shutdownNow();
brokerClientSharedScheduledExecutorProvider.shutdownNow();
brokerClientSharedLookupExecutorProvider.shutdownNow();
brokerClientSharedTimer.stop();
if (monotonicClock instanceof AutoCloseable c) {
c.close();
}
if (openTelemetryTransactionPendingAckStoreStats != null) {
openTelemetryTransactionPendingAckStoreStats.close();
openTelemetryTransactionPendingAckStoreStats = null;
}
if (openTelemetryTransactionCoordinatorStats != null) {
openTelemetryTransactionCoordinatorStats.close();
openTelemetryTransactionCoordinatorStats = null;
}
if (openTelemetryReplicatorStats != null) {
openTelemetryReplicatorStats.close();
openTelemetryReplicatorStats = null;
}
if (openTelemetryProducerStats != null) {
openTelemetryProducerStats.close();
openTelemetryProducerStats = null;
}
if (openTelemetryConsumerStats != null) {
openTelemetryConsumerStats.close();
openTelemetryConsumerStats = null;
}
if (openTelemetryTopicStats != null) {
openTelemetryTopicStats.close();
openTelemetryTopicStats = null;
}
asyncCloseFutures.add(EventLoopUtil.shutdownGracefully(ioEventLoopGroup));
// add timeout handling for closing executors
asyncCloseFutures.add(executorServicesShutdown.handle());
closeFuture = addTimeoutHandling(FutureUtil.waitForAllAndSupportCancel(asyncCloseFutures));
closeFuture.handle((v, t) -> {
if (t == null) {
LOG.info("Closed");
} else if (t instanceof CancellationException) {
LOG.info("Closed (shutdown cancelled)");
} else if (t instanceof TimeoutException) {
LOG.info("Closed (shutdown timeout)");
} else {
LOG.warn("Closed with errors", t);
}
state = State.Closed;
isClosedCondition.signalAll();
return null;
});
return closeFuture;
} catch (Exception e) {
PulsarServerException pse;
if (e instanceof CompletionException && e.getCause() instanceof MetadataStoreException) {
pse = new PulsarServerException(MetadataStoreException.unwrap(e));
} else if (e.getCause() instanceof CompletionException
&& e.getCause().getCause() instanceof MetadataStoreException) {
pse = new PulsarServerException(MetadataStoreException.unwrap(e.getCause()));
} else {
pse = new PulsarServerException(e);
}
return FutureUtil.failedFuture(pse);
} finally {
mutex.unlock();
}
}