in pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java [1769:1928]
private void createPersistentTopic(final String topic, boolean createIfMissing,
CompletableFuture<Optional<Topic>> topicFuture,
Map<String, String> properties, @Nullable TopicPolicies topicPolicies) {
TopicName topicName = TopicName.get(topic);
final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
if (isTransactionInternalName(topicName)) {
String msg = String.format("Can not create transaction system topic %s", topic);
log.warn(msg);
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
topicFuture.completeExceptionally(new NotAllowedException(msg));
return;
}
CompletableFuture<Void> maxTopicsCheck = createIfMissing
? checkMaxTopicsPerNamespace(topicName, 1)
: CompletableFuture.completedFuture(null);
CompletableFuture<Void> isTopicAlreadyMigrated = checkTopicAlreadyMigrated(topicName);
maxTopicsCheck.thenCompose(partitionedTopicMetadata -> validateTopicConsistency(topicName))
.thenCompose(__ -> isTopicAlreadyMigrated)
.thenCompose(__ -> getManagedLedgerConfig(topicName, topicPolicies))
.thenAccept(managedLedgerConfig -> {
if (isBrokerEntryMetadataEnabled() || isBrokerPayloadProcessorEnabled()) {
// init managedLedger interceptor
Set<BrokerEntryMetadataInterceptor> interceptors = new HashSet<>();
for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) {
// add individual AppendOffsetMetadataInterceptor for each topic
if (interceptor instanceof AppendIndexMetadataInterceptor) {
interceptors.add(new AppendIndexMetadataInterceptor());
} else {
interceptors.add(interceptor);
}
}
managedLedgerConfig.setManagedLedgerInterceptor(
new ManagedLedgerInterceptorImpl(interceptors, brokerEntryPayloadProcessors));
}
managedLedgerConfig.setCreateIfMissing(createIfMissing);
managedLedgerConfig.setProperties(properties);
String shadowSource = managedLedgerConfig.getShadowSource();
if (shadowSource != null) {
managedLedgerConfig.setShadowSourceName(TopicName.get(shadowSource).getPersistenceNamingEncoding());
}
topicEventsDispatcher.notify(topic, TopicEvent.LOAD, EventStage.BEFORE);
// load can fail with topicFuture completed non-exceptionally
// work around this
final CompletableFuture<Void> loadFuture = new CompletableFuture<>();
topicFuture.whenComplete((res, ex) -> {
if (ex == null) {
loadFuture.complete(null);
} else {
loadFuture.completeExceptionally(ex);
}
});
if (createIfMissing) {
topicEventsDispatcher.notify(topic, TopicEvent.CREATE, EventStage.BEFORE);
topicEventsDispatcher.notifyOnCompletion(topicFuture, topic, TopicEvent.CREATE);
}
topicEventsDispatcher.notifyOnCompletion(loadFuture, topic, TopicEvent.LOAD);
// Once we have the configuration, we can proceed with the async open operation
ManagedLedgerFactory managedLedgerFactory =
getManagedLedgerFactoryForTopic(topicName, managedLedgerConfig.getStorageClassName());
managedLedgerFactory.asyncOpen(topicName.getPersistenceNamingEncoding(), managedLedgerConfig,
new OpenLedgerCallback() {
@Override
public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
try {
PersistentTopic persistentTopic = isSystemTopic(topic)
? new SystemTopic(topic, ledger, BrokerService.this)
: newTopic(topic, ledger, BrokerService.this, PersistentTopic.class);
persistentTopic.setCreateFuture(topicFuture);
persistentTopic
.initialize()
.thenCompose(__ -> persistentTopic.preCreateSubscriptionForCompactionIfNeeded())
.thenCompose(__ -> persistentTopic.checkReplication())
.thenCompose(v -> {
// Also check dedup status
return persistentTopic.checkDeduplicationStatus();
})
.thenRun(() -> {
log.info("Created topic {} - dedup is {}", topic,
persistentTopic.isDeduplicationEnabled() ? "enabled" : "disabled");
long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime())
- topicCreateTimeMs;
pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
if (!topicFuture.complete(Optional.of(persistentTopic))) {
// Check create persistent topic timeout.
if (topicFuture.isCompletedExceptionally()) {
log.warn("{} future is already completed with failure {}, closing"
+ " the topic", topic, FutureUtil.getException(topicFuture));
} else {
// It should not happen.
log.error("{} future is already completed by another thread, "
+ "which is not expected. Closing the current one", topic);
}
executor().submit(() -> {
persistentTopic.close().whenComplete((ignore, ex) -> {
topics.remove(topic, topicFuture);
if (ex != null) {
log.warn("[{}] Get an error when closing topic.",
topic, ex);
}
});
});
} else {
addTopicToStatsMaps(topicName, persistentTopic);
}
})
.exceptionally((ex) -> {
log.warn("Replication or dedup check failed."
+ " Removing topic from topics list {}, {}", topic, ex);
executor().submit(() -> {
persistentTopic.close().whenComplete((ignore, closeEx) -> {
topics.remove(topic, topicFuture);
if (closeEx != null) {
log.warn("[{}] Get an error when closing topic.",
topic, closeEx);
}
topicFuture.completeExceptionally(ex);
});
});
return null;
});
} catch (Exception e) {
log.warn("Failed to create topic {}: {}", topic, e.getMessage());
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
topicFuture.completeExceptionally(e);
}
}
@Override
public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
if (!createIfMissing && exception instanceof ManagedLedgerNotFoundException) {
// We were just trying to load a topic and the topic doesn't exist
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
loadFuture.completeExceptionally(exception);
topicFuture.complete(Optional.empty());
} else {
log.warn("Failed to create topic {}", topic, exception);
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
topicFuture.completeExceptionally(new PersistenceException(exception));
}
}
}, () -> isTopicNsOwnedByBrokerAsync(topicName), null);
}).exceptionally((exception) -> {
boolean migrationFailure = exception.getCause() instanceof TopicMigratedException;
String msg = migrationFailure ? "Topic is already migrated" :
"Failed to get topic configuration:";
log.warn("[{}] {} {}", topic, msg, exception.getMessage(), exception);
// remove topic from topics-map in different thread to avoid possible deadlock if
// createPersistentTopic-thread only tries to handle this future-result
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
topicFuture.completeExceptionally(exception);
return null;
});
}