in pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java [1662:1826]
public CompletableFuture<Void> close(
boolean disconnectClients, boolean closeWithoutWaitingClientDisconnect) {
lock.writeLock().lock();
// Choose the close type.
CloseTypes closeType;
if (!disconnectClients) {
closeType = CloseTypes.transferring;
} else if (closeWithoutWaitingClientDisconnect) {
closeType = CloseTypes.notWaitDisconnectClients;
} else {
// closing managed-ledger waits until all producers/consumers/replicators get closed. Sometimes, broker
// forcefully wants to close managed-ledger without waiting all resources to be closed.
closeType = CloseTypes.waitDisconnectClients;
}
/** Maybe there is a in-progress half closing task. see the section 2-b-1 of {@link CloseFutures}. **/
CompletableFuture<Void> inProgressTransferCloseTask = null;
try {
// Return in-progress future if exists.
if (isClosingOrDeleting) {
if (closeType == CloseTypes.transferring) {
return closeFutures.transferring;
}
if (closeType == CloseTypes.notWaitDisconnectClients && closeFutures.notWaitDisconnectClients != null) {
return closeFutures.notWaitDisconnectClients;
}
if (closeType == CloseTypes.waitDisconnectClients && closeFutures.waitDisconnectClients != null) {
return closeFutures.waitDisconnectClients;
}
if (transferring) {
inProgressTransferCloseTask = closeFutures.transferring;
}
}
fenceTopicToCloseOrDelete();
if (closeType == CloseTypes.transferring) {
transferring = true;
this.closeFutures = new CloseFutures(new CompletableFuture(), null, null);
} else {
this.closeFutures =
new CloseFutures(new CompletableFuture(), new CompletableFuture(), new CompletableFuture());
}
} finally {
lock.writeLock().unlock();
}
List<CompletableFuture<Void>> futures = new ArrayList<>();
if (inProgressTransferCloseTask != null) {
futures.add(inProgressTransferCloseTask);
}
futures.add(transactionBuffer.closeAsync());
replicators.forEach((cluster, replicator) -> futures.add(replicator.terminate()));
shadowReplicators.forEach((__, replicator) -> futures.add(replicator.terminate()));
if (closeType != CloseTypes.transferring) {
futures.add(ExtensibleLoadManagerImpl.getAssignedBrokerLookupData(
brokerService.getPulsar(), topic).thenAccept(lookupData -> {
producers.values().forEach(producer -> futures.add(producer.disconnect(lookupData)));
// Topics unloaded due to the ExtensibleLoadManager undergo closing twice: first with
// disconnectClients = false, second with disconnectClients = true. The check below identifies the
// cases when Topic.close is called outside the scope of the ExtensibleLoadManager. In these
// situations, we must pursue the regular Subscription.close, as Topic.close is invoked just once.
if (isTransferring()) {
subscriptions.forEach((s, sub) -> futures.add(sub.disconnect(lookupData)));
} else {
subscriptions.forEach((s, sub) -> futures.add(sub.close(true, lookupData)));
}
}
));
} else {
subscriptions.forEach((s, sub) -> futures.add(sub.close(false, Optional.empty())));
}
//close entry filters
if (entryFilters != null) {
entryFilters.getRight().forEach((filter) -> {
try {
filter.close();
} catch (Throwable e) {
log.warn("Error shutting down entry filter {}", filter, e);
}
});
}
if (topicCompactionService != null) {
try {
topicCompactionService.close();
} catch (Exception e) {
log.warn("Error close topicCompactionService ", e);
}
}
CompletableFuture<Void> disconnectClientsInCurrentCall = null;
// Note: "disconnectClientsToCache" is a non-able value, it is null when close type is transferring.
AtomicReference<CompletableFuture<Void>> disconnectClientsToCache = new AtomicReference<>();
switch (closeType) {
case transferring -> {
disconnectClientsInCurrentCall = FutureUtil.waitForAll(futures);
break;
}
case notWaitDisconnectClients -> {
disconnectClientsInCurrentCall = CompletableFuture.completedFuture(null);
disconnectClientsToCache.set(FutureUtil.waitForAll(futures));
break;
}
case waitDisconnectClients -> {
disconnectClientsInCurrentCall = FutureUtil.waitForAll(futures);
disconnectClientsToCache.set(disconnectClientsInCurrentCall);
}
}
CompletableFuture<Void> closeFuture = new CompletableFuture<>();
Runnable closeLedgerAfterCloseClients = (() -> ledger.asyncClose(new CloseCallback() {
@Override
public void closeComplete(Object ctx) {
if (closeType != CloseTypes.transferring) {
// Everything is now closed, remove the topic from map
disposeTopic(closeFuture);
} else {
closeFuture.complete(null);
}
}
@Override
public void closeFailed(ManagedLedgerException exception, Object ctx) {
log.error("[{}] Failed to close managed ledger, proceeding anyway.", topic, exception);
if (closeType != CloseTypes.transferring) {
disposeTopic(closeFuture);
} else {
closeFuture.complete(null);
}
}
}, null));
disconnectClientsInCurrentCall.thenRun(closeLedgerAfterCloseClients).exceptionally(exception -> {
log.error("[{}] Error closing topic", topic, exception);
unfenceTopicToResume();
closeFuture.completeExceptionally(exception);
return null;
});
switch (closeType) {
case transferring -> {
FutureUtil.completeAfterAll(closeFutures.transferring, closeFuture);
break;
}
case notWaitDisconnectClients -> {
FutureUtil.completeAfterAll(closeFutures.transferring, closeFuture);
FutureUtil.completeAfter(closeFutures.notWaitDisconnectClients, closeFuture);
FutureUtil.completeAfterAll(closeFutures.waitDisconnectClients,
closeFuture.thenCompose(ignore -> disconnectClientsToCache.get().exceptionally(ex -> {
// Since the managed ledger has been closed, eat the error of clients disconnection.
log.error("[{}] Closed managed ledger, but disconnect clients failed,"
+ " this topic will be marked closed", topic, ex);
return null;
})));
break;
}
case waitDisconnectClients -> {
FutureUtil.completeAfterAll(closeFutures.transferring, closeFuture);
FutureUtil.completeAfter(closeFutures.notWaitDisconnectClients, closeFuture);
FutureUtil.completeAfterAll(closeFutures.waitDisconnectClients, closeFuture);
}
}
return closeFuture;
}