in pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java [1474:1637]
private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
boolean failIfHasBacklogs,
boolean closeIfClientsConnected) {
lock.writeLock().lock();
try {
if (isClosingOrDeleting) {
log.warn("[{}] Topic is already being closed or deleted", topic);
return FutureUtil.failedFuture(new TopicFencedException("Topic is already fenced"));
}
// We can proceed with the deletion if either:
// 1. No one is connected and no subscriptions
// 2. The topic have subscriptions but no backlogs for all subscriptions
// if delete_when_no_subscriptions is applied
// 3. We want to kick out everyone and forcefully delete the topic.
// In this case, we shouldn't care if the usageCount is 0 or not, just proceed
if (!closeIfClientsConnected) {
if (failIfHasSubscriptions && !subscriptions.isEmpty()) {
return FutureUtil.failedFuture(new TopicBusyException("Topic has subscriptions: "
+ subscriptions.keySet().stream().toList()));
} else if (failIfHasBacklogs) {
if (hasBacklogs(false)) {
List<String> backlogSubs =
subscriptions.values().stream()
.filter(sub -> sub.getNumberOfEntriesInBacklog(false) > 0)
.map(PersistentSubscription::getName).toList();
return FutureUtil.failedFuture(
new TopicBusyException("Topic has subscriptions did not catch up: " + backlogSubs));
} else if (!producers.isEmpty()) {
return FutureUtil.failedFuture(new TopicBusyException(
"Topic has " + producers.size() + " connected producers"));
}
} else if (currentUsageCount() > 0) {
return FutureUtil.failedFuture(new TopicBusyException(
"Topic has " + currentUsageCount() + " connected producers/consumers"));
}
}
fenceTopicToCloseOrDelete(); // Avoid clients reconnections while deleting
// Mark the progress of close to prevent close calling concurrently.
this.closeFutures =
new CloseFutures(new CompletableFuture(), new CompletableFuture(), new CompletableFuture());
AtomicBoolean alreadyUnFenced = new AtomicBoolean();
CompletableFuture<Void> res = getBrokerService().getPulsar().getPulsarResources().getNamespaceResources()
.getPartitionedTopicResources().runWithMarkDeleteAsync(TopicName.get(topic), () -> {
CompletableFuture<Void> deleteFuture = new CompletableFuture<>();
CompletableFuture<Void> closeClientFuture = new CompletableFuture<>();
List<CompletableFuture<Void>> futures = new ArrayList<>();
subscriptions.forEach((s, sub) -> futures.add(sub.close(true, Optional.empty())));
if (closeIfClientsConnected) {
replicators.forEach((cluster, replicator) -> futures.add(replicator.terminate()));
shadowReplicators.forEach((__, replicator) -> futures.add(replicator.terminate()));
producers.values().forEach(producer -> futures.add(producer.disconnect()));
}
FutureUtil.waitForAll(futures).thenRunAsync(() -> {
closeClientFuture.complete(null);
}, command -> {
try {
getOrderedExecutor().execute(command);
} catch (RejectedExecutionException e) {
// executor has been shut down, execute in current thread
command.run();
}
}).exceptionally(ex -> {
log.error("[{}] Error closing clients", topic, ex);
alreadyUnFenced.set(true);
unfenceTopicToResume();
closeClientFuture.completeExceptionally(ex);
return null;
});
closeClientFuture.thenAccept(__ -> {
CompletableFuture<Void> deleteTopicAuthenticationFuture = new CompletableFuture<>();
brokerService.deleteTopicAuthenticationWithRetry(topic, deleteTopicAuthenticationFuture, 5);
deleteTopicAuthenticationFuture.thenCompose(ignore -> deleteSchema())
.thenCompose(ignore -> deleteTopicPolicies())
.thenCompose(ignore -> transactionBufferCleanupAndClose())
.whenComplete((v, ex) -> {
if (ex != null) {
log.error("[{}] Error deleting topic", topic, ex);
alreadyUnFenced.set(true);
unfenceTopicToResume();
deleteFuture.completeExceptionally(ex);
} else {
List<CompletableFuture<Void>> subsDeleteFutures = new ArrayList<>();
subscriptions.forEach((sub, p) -> subsDeleteFutures.add(unsubscribe(sub)));
FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> {
if (e != null) {
log.error("[{}] Error deleting topic", topic, e);
alreadyUnFenced.set(true);
unfenceTopicToResume();
deleteFuture.completeExceptionally(e);
} else {
ledger.asyncDelete(new AsyncCallbacks.DeleteLedgerCallback() {
@Override
public void deleteLedgerComplete(Object ctx) {
brokerService.removeTopicFromCache(PersistentTopic.this);
dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);
unregisterTopicPolicyListener();
log.info("[{}] Topic deleted", topic);
deleteFuture.complete(null);
}
@Override
public void
deleteLedgerFailed(ManagedLedgerException exception,
Object ctx) {
if (exception.getCause()
instanceof MetadataStoreException.NotFoundException) {
log.info("[{}] Topic is already deleted {}",
topic, exception.getMessage());
deleteLedgerComplete(ctx);
} else {
log.error("[{}] Error deleting topic",
topic, exception);
alreadyUnFenced.set(true);
unfenceTopicToResume();
deleteFuture.completeExceptionally(
new PersistenceException(exception));
}
}
}, null);
}
});
}
});
}).exceptionally(ex->{
alreadyUnFenced.set(true);
unfenceTopicToResume();
deleteFuture.completeExceptionally(
new TopicBusyException("Failed to close clients before deleting topic.",
FutureUtil.unwrapCompletionException(ex)));
return null;
});
return deleteFuture;
}).whenComplete((value, ex) -> {
if (ex != null) {
log.error("[{}] Error deleting topic", topic, ex);
if (!alreadyUnFenced.get()) {
unfenceTopicToResume();
}
}
});
FutureUtil.completeAfter(closeFutures.transferring, res);
FutureUtil.completeAfter(closeFutures.notWaitDisconnectClients, res);
FutureUtil.completeAfter(closeFutures.waitDisconnectClients, res);
return res;
} finally {
lock.writeLock().unlock();
}
}