private void createPersistentTopic()

in pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java [1769:1928]


    private void createPersistentTopic(final String topic, boolean createIfMissing,
                                       CompletableFuture<Optional<Topic>> topicFuture,
                                       Map<String, String> properties, @Nullable TopicPolicies topicPolicies) {
        TopicName topicName = TopicName.get(topic);
        final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());

        if (isTransactionInternalName(topicName)) {
            String msg = String.format("Can not create transaction system topic %s", topic);
            log.warn(msg);
            pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
            topicFuture.completeExceptionally(new NotAllowedException(msg));
            return;
        }

        CompletableFuture<Void> maxTopicsCheck = createIfMissing
                ? checkMaxTopicsPerNamespace(topicName, 1)
                : CompletableFuture.completedFuture(null);

        CompletableFuture<Void> isTopicAlreadyMigrated = checkTopicAlreadyMigrated(topicName);
        maxTopicsCheck.thenCompose(partitionedTopicMetadata -> validateTopicConsistency(topicName))
                .thenCompose(__ -> isTopicAlreadyMigrated)
                .thenCompose(__ -> getManagedLedgerConfig(topicName, topicPolicies))
        .thenAccept(managedLedgerConfig -> {
            if (isBrokerEntryMetadataEnabled() || isBrokerPayloadProcessorEnabled()) {
                // init managedLedger interceptor
                Set<BrokerEntryMetadataInterceptor> interceptors = new HashSet<>();
                for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) {
                    // add individual AppendOffsetMetadataInterceptor for each topic
                    if (interceptor instanceof AppendIndexMetadataInterceptor) {
                        interceptors.add(new AppendIndexMetadataInterceptor());
                    } else {
                        interceptors.add(interceptor);
                    }
                }
                managedLedgerConfig.setManagedLedgerInterceptor(
                        new ManagedLedgerInterceptorImpl(interceptors, brokerEntryPayloadProcessors));
            }
            managedLedgerConfig.setCreateIfMissing(createIfMissing);
            managedLedgerConfig.setProperties(properties);
            String shadowSource = managedLedgerConfig.getShadowSource();
            if (shadowSource != null) {
                managedLedgerConfig.setShadowSourceName(TopicName.get(shadowSource).getPersistenceNamingEncoding());
            }

            topicEventsDispatcher.notify(topic, TopicEvent.LOAD, EventStage.BEFORE);
            // load can fail with topicFuture completed non-exceptionally
            // work around this
            final CompletableFuture<Void> loadFuture = new CompletableFuture<>();
            topicFuture.whenComplete((res, ex) -> {
                if (ex == null) {
                    loadFuture.complete(null);
                } else {
                    loadFuture.completeExceptionally(ex);
                }
            });

            if (createIfMissing) {
                topicEventsDispatcher.notify(topic, TopicEvent.CREATE, EventStage.BEFORE);
                topicEventsDispatcher.notifyOnCompletion(topicFuture, topic, TopicEvent.CREATE);
            }
            topicEventsDispatcher.notifyOnCompletion(loadFuture, topic, TopicEvent.LOAD);

            // Once we have the configuration, we can proceed with the async open operation
            ManagedLedgerFactory managedLedgerFactory =
                    getManagedLedgerFactoryForTopic(topicName, managedLedgerConfig.getStorageClassName());
            managedLedgerFactory.asyncOpen(topicName.getPersistenceNamingEncoding(), managedLedgerConfig,
                    new OpenLedgerCallback() {
                        @Override
                        public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
                            try {
                                PersistentTopic persistentTopic = isSystemTopic(topic)
                                        ? new SystemTopic(topic, ledger, BrokerService.this)
                                        : newTopic(topic, ledger, BrokerService.this, PersistentTopic.class);
                                persistentTopic.setCreateFuture(topicFuture);
                                persistentTopic
                                        .initialize()
                                        .thenCompose(__ -> persistentTopic.preCreateSubscriptionForCompactionIfNeeded())
                                        .thenCompose(__ -> persistentTopic.checkReplication())
                                        .thenCompose(v -> {
                                            // Also check dedup status
                                            return persistentTopic.checkDeduplicationStatus();
                                        })
                                        .thenRun(() -> {
                                            log.info("Created topic {} - dedup is {}", topic,
                                            persistentTopic.isDeduplicationEnabled() ? "enabled" : "disabled");
                                            long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime())
                                                                        - topicCreateTimeMs;
                                            pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
                                            if (!topicFuture.complete(Optional.of(persistentTopic))) {
                                                // Check create persistent topic timeout.
                                                if (topicFuture.isCompletedExceptionally()) {
                                                    log.warn("{} future is already completed with failure {}, closing"
                                                        + " the topic", topic, FutureUtil.getException(topicFuture));
                                                } else {
                                                    // It should not happen.
                                                    log.error("{} future is already completed by another thread, "
                                                            + "which is not expected. Closing the current one", topic);
                                                }
                                                executor().submit(() -> {
                                                    persistentTopic.close().whenComplete((ignore, ex) -> {
                                                        topics.remove(topic, topicFuture);
                                                        if (ex != null) {
                                                            log.warn("[{}] Get an error when closing topic.",
                                                                    topic, ex);
                                                        }
                                                    });
                                                });
                                            } else {
                                                addTopicToStatsMaps(topicName, persistentTopic);
                                            }
                                        })
                                        .exceptionally((ex) -> {
                                            log.warn("Replication or dedup check failed."
                                                    + " Removing topic from topics list {}, {}", topic, ex);
                                            executor().submit(() -> {
                                                persistentTopic.close().whenComplete((ignore, closeEx) -> {
                                                    topics.remove(topic, topicFuture);
                                                    if (closeEx != null) {
                                                        log.warn("[{}] Get an error when closing topic.",
                                                                topic, closeEx);
                                                    }
                                                    topicFuture.completeExceptionally(ex);
                                                });
                                            });
                                            return null;
                                        });
                            } catch (Exception e) {
                                log.warn("Failed to create topic {}: {}", topic, e.getMessage());
                                pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
                                topicFuture.completeExceptionally(e);
                            }
                        }

                        @Override
                        public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
                            if (!createIfMissing && exception instanceof ManagedLedgerNotFoundException) {
                                // We were just trying to load a topic and the topic doesn't exist
                                pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
                                loadFuture.completeExceptionally(exception);
                                topicFuture.complete(Optional.empty());
                            } else {
                                log.warn("Failed to create topic {}", topic, exception);
                                pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
                                topicFuture.completeExceptionally(new PersistenceException(exception));
                            }
                        }
                    }, () -> isTopicNsOwnedByBrokerAsync(topicName), null);

        }).exceptionally((exception) -> {
            boolean migrationFailure = exception.getCause() instanceof TopicMigratedException;
            String msg = migrationFailure ? "Topic is already migrated" :
                "Failed to get topic configuration:";
            log.warn("[{}] {} {}", topic, msg, exception.getMessage(), exception);
            // remove topic from topics-map in different thread to avoid possible deadlock if
            // createPersistentTopic-thread only tries to handle this future-result
            pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
            topicFuture.completeExceptionally(exception);
            return null;
        });
    }