public void start()

in pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java [797:1067]


    public void start() throws PulsarServerException {
        LOG.info("Starting Pulsar Broker service; version: '{}'",
                (brokerVersion != null ? brokerVersion : "unknown"));
        LOG.info("Git Revision {}", PulsarVersion.getGitSha());
        LOG.info("Git Branch {}", PulsarVersion.getGitBranch());
        LOG.info("Built by {} on {} at {}",
                PulsarVersion.getBuildUser(),
                PulsarVersion.getBuildHost(),
                PulsarVersion.getBuildTime());

        long startTimestamp = System.currentTimeMillis();  // start time mills

        mutex.lock();
        try {
            if (state != State.Init) {
                throw new PulsarServerException("Cannot start the service once it was stopped");
            }

            if (config.getWebServicePort().isEmpty() && config.getWebServicePortTls().isEmpty()) {
                throw new IllegalArgumentException("webServicePort/webServicePortTls must be present");
            }

            if (config.isAuthorizationEnabled() && !config.isAuthenticationEnabled()) {
                throw new IllegalStateException("Invalid broker configuration. Authentication must be enabled with "
                        + "authenticationEnabled=true when authorization is enabled with authorizationEnabled=true.");
            }

            if (config.getDefaultRetentionSizeInMB() > 0
                    && config.getBacklogQuotaDefaultLimitBytes() > 0
                    && config.getBacklogQuotaDefaultLimitBytes()
                    >= (config.getDefaultRetentionSizeInMB() * 1024L * 1024L)) {
                throw new IllegalArgumentException(String.format("The retention size must > the backlog quota limit "
                                + "size, but the configured backlog quota limit bytes is %d, the retention size is %d",
                        config.getBacklogQuotaDefaultLimitBytes(),
                        config.getDefaultRetentionSizeInMB() * 1024L * 1024L));
            }

            if (config.getDefaultRetentionTimeInMinutes() > 0
                    && config.getBacklogQuotaDefaultLimitSecond() > 0
                    && config.getBacklogQuotaDefaultLimitSecond() >= config.getDefaultRetentionTimeInMinutes() * 60) {
                throw new IllegalArgumentException(String.format("The retention time must > the backlog quota limit "
                                + "time, but the configured backlog quota limit time duration is %d, "
                                + "the retention time duration is %d",
                        config.getBacklogQuotaDefaultLimitSecond(),
                        config.getDefaultRetentionTimeInMinutes() * 60));
            }

            openTelemetryTopicStats = new OpenTelemetryTopicStats(this);
            openTelemetryConsumerStats = new OpenTelemetryConsumerStats(this);
            openTelemetryProducerStats = new OpenTelemetryProducerStats(this);
            openTelemetryReplicatorStats = new OpenTelemetryReplicatorStats(this);
            openTelemetryReplicatedSubscriptionStats = new OpenTelemetryReplicatedSubscriptionStats(this);

            localMetadataSynchronizer = StringUtils.isNotBlank(config.getMetadataSyncEventTopic())
                    ? new PulsarMetadataEventSynchronizer(this, config.getMetadataSyncEventTopic())
                    : null;
            localMetadataStore = createLocalMetadataStore(localMetadataSynchronizer,
                    openTelemetry.getOpenTelemetryService().getOpenTelemetry());
            localMetadataStore.registerSessionListener(this::handleMetadataSessionEvent);

            coordinationService = new CoordinationServiceImpl(localMetadataStore);

            if (config.isConfigurationStoreSeparated()) {
                configMetadataSynchronizer = StringUtils.isNotBlank(config.getConfigurationMetadataSyncEventTopic())
                        ? new PulsarMetadataEventSynchronizer(this, config.getConfigurationMetadataSyncEventTopic())
                        : null;
                configurationMetadataStore = createConfigurationMetadataStore(configMetadataSynchronizer,
                        openTelemetry.getOpenTelemetryService().getOpenTelemetry());
                shouldShutdownConfigurationMetadataStore = true;
            } else {
                configurationMetadataStore = localMetadataStore;
                shouldShutdownConfigurationMetadataStore = false;
            }
            pulsarResources = newPulsarResources();

            orderedExecutor = newOrderedExecutor();

            // Initialize the message protocol handlers
            protocolHandlers = ProtocolHandlers.load(config);
            protocolHandlers.initialize(config);

            // Now we are ready to start services
            this.bkClientFactory = newBookKeeperClientFactory();

            managedLedgerStorage = newManagedLedgerStorage();

            this.brokerService = newBrokerService(this);

            // Start load management service (even if load balancing is disabled)
            this.loadManager.set(LoadManager.create(this));

            // needs load management service and before start broker service,
            this.startNamespaceService();

            schemaStorage = createAndStartSchemaStorage();
            schemaRegistryService = SchemaRegistryService.create(
                    schemaStorage, config.getSchemaRegistryCompatibilityCheckers(), this);

            OffloadPoliciesImpl defaultOffloadPolicies =
                    OffloadPoliciesImpl.create(this.getConfiguration().getProperties());

            OrderedScheduler offloaderScheduler = getOffloaderScheduler(defaultOffloadPolicies);
            int interval = config.getManagedLedgerStatsPeriodSeconds();
            boolean exposeTopicMetrics = config.isExposeTopicLevelMetricsInPrometheus();

            offloaderStats = LedgerOffloaderStats.create(config.isExposeManagedLedgerMetricsInPrometheus(),
                    exposeTopicMetrics, offloaderScheduler, interval);
            this.defaultOffloader = createManagedLedgerOffloader(defaultOffloadPolicies);

            setBrokerInterceptor(newBrokerInterceptor());
            // use getter to support mocking getBrokerInterceptor method in tests
            BrokerInterceptor interceptor = getBrokerInterceptor();
            if (interceptor != null) {
                brokerService.setInterceptor(interceptor);
                interceptor.initialize(this);
            }
            brokerService.start();

            // Load additional servlets
            this.brokerAdditionalServlets = AdditionalServlets.load(config);

            this.webService = new WebService(this);
            createMetricsServlet();
            this.addWebServerHandlers(webService, metricsServlet, this.config);
            this.webService.start();

            // Refresh addresses and update configuration, since the port might have been dynamically assigned
            if (config.getBrokerServicePort().equals(Optional.of(0))) {
                config.setBrokerServicePort(brokerService.getListenPort());
            }
            if (config.getBrokerServicePortTls().equals(Optional.of(0))) {
                config.setBrokerServicePortTls(brokerService.getListenPortTls());
            }
            this.webServiceAddress = webAddress(config);
            this.webServiceAddressTls = webAddressTls(config);
            this.brokerServiceUrl = brokerUrl(config);
            this.brokerServiceUrlTls = brokerUrlTls(config);

            // the broker id is used in the load manager to identify the broker
            // it should not be used for making connections to the broker
            this.brokerId =
                    String.format("%s:%s", advertisedAddress, config.getWebServicePort()
                            .or(config::getWebServicePortTls).orElseThrow());

            if (this.compactionServiceFactory == null) {
                this.compactionServiceFactory = loadCompactionServiceFactory();
            }

            if (null != this.webSocketService) {
                ClusterDataImpl clusterData = ClusterDataImpl.builder()
                        .serviceUrl(webServiceAddress)
                        .serviceUrlTls(webServiceAddressTls)
                        .brokerServiceUrl(brokerServiceUrl)
                        .brokerServiceUrlTls(brokerServiceUrlTls)
                        .build();
                this.webSocketService.setLocalCluster(clusterData);
            }

            // Initialize namespace service, after service url assigned. Should init zk and refresh self owner info.
            this.nsService.initialize();

            // Start the leader election service
            startLeaderElectionService();

            // By starting the Load manager service, the broker will also become visible
            // to the rest of the broker by creating the registration z-node. This needs
            // to be done only when the broker is fully operative.
            //
            // The load manager service and its service unit state channel need to be initialized first
            // (namespace service depends on load manager)
            this.startLoadManagementService();

            // Start topic level policies service
            this.topicPoliciesService = initTopicPoliciesService();
            this.topicPoliciesService.start(this);

            // Register heartbeat and bootstrap namespaces.
            this.nsService.registerBootstrapNamespaces();

            // Register pulsar system namespaces and start transaction meta store service
            if (config.isTransactionCoordinatorEnabled()) {
                MLTransactionMetadataStoreProvider.initBufferedWriterMetrics(getAdvertisedAddress());
                MLPendingAckStoreProvider.initBufferedWriterMetrics(getAdvertisedAddress());

                this.transactionBufferSnapshotServiceFactory = new TransactionBufferSnapshotServiceFactory(this);

                this.transactionTimer =
                        new HashedWheelTimer(new DefaultThreadFactory("pulsar-transaction-timer"));
                transactionBufferClient = TransactionBufferClientImpl.create(this, transactionTimer,
                        config.getTransactionBufferClientMaxConcurrentRequests(),
                        config.getTransactionBufferClientOperationTimeoutInMills());

                transactionMetadataStoreService = new TransactionMetadataStoreService(TransactionMetadataStoreProvider
                        .newProvider(config.getTransactionMetadataStoreProviderClassName()), this,
                        transactionBufferClient, transactionTimer);

                transactionBufferProvider = TransactionBufferProvider
                        .newProvider(config.getTransactionBufferProviderClassName());
                transactionPendingAckStoreProvider = TransactionPendingAckStoreProvider
                        .newProvider(config.getTransactionPendingAckStoreProviderClassName());

                openTelemetryTransactionCoordinatorStats = new OpenTelemetryTransactionCoordinatorStats(this);
                openTelemetryTransactionPendingAckStoreStats = new OpenTelemetryTransactionPendingAckStoreStats(this);
            }

            this.metricsGenerator = new MetricsGenerator(this);

            // the broker is ready to accept incoming requests by Pulsar binary protocol and http/https
            final List<Runnable> runnables;
            synchronized (pendingTasksBeforeReadyForIncomingRequests) {
                runnables = new ArrayList<>(pendingTasksBeforeReadyForIncomingRequests);
                pendingTasksBeforeReadyForIncomingRequests.clear();
                readyForIncomingRequestsFuture.complete(null);
            }
            runnables.forEach(Runnable::run);

            // Initialize the message protocol handlers.
            // start the protocol handlers only after the broker is ready,
            // so that the protocol handlers can access broker service properly.
            this.protocolHandlers.start(brokerService);
            Map<String, Map<InetSocketAddress, ChannelInitializer<SocketChannel>>> protocolHandlerChannelInitializers =
                this.protocolHandlers.newChannelInitializers();
            this.brokerService.startProtocolHandlers(protocolHandlerChannelInitializers);

            acquireSLANamespace();

            // start function worker service if necessary
            this.startWorkerService(brokerService.getAuthenticationService(), brokerService.getAuthorizationService());

            // start packages management service if necessary
            if (config.isEnablePackagesManagement()) {
                this.startPackagesManagementService();
            }

            // Start the task to publish resource usage, if necessary
            this.resourceUsageTransportManager = DISABLE_RESOURCE_USAGE_TRANSPORT_MANAGER;
            if (isNotBlank(config.getResourceUsageTransportClassName())) {
                Class<?> clazz = Class.forName(config.getResourceUsageTransportClassName());
                Constructor<?> ctor = clazz.getConstructor(PulsarService.class);
                Object object = ctor.newInstance(this);
                this.resourceUsageTransportManager = (ResourceUsageTopicTransportManager) object;
            }
            this.resourceGroupServiceManager = new ResourceGroupService(this);
            if (localMetadataSynchronizer != null) {
                localMetadataSynchronizer.start();
            }
            if (configMetadataSynchronizer != null) {
                configMetadataSynchronizer.start();
            }

            long currentTimestamp = System.currentTimeMillis();
            final long bootstrapTimeSeconds = TimeUnit.MILLISECONDS.toSeconds(currentTimestamp - startTimestamp);

            final String bootstrapMessage = "bootstrap service "
                    + (config.getWebServicePort().isPresent() ? "port = " + config.getWebServicePort().get() : "")
                    + (config.getWebServicePortTls().isPresent() ? ", tls-port = " + config.getWebServicePortTls() : "")
                    + (StringUtils.isNotEmpty(brokerServiceUrl) ? ", broker url= " + brokerServiceUrl : "")
                    + (StringUtils.isNotEmpty(brokerServiceUrlTls) ? ", broker tls url= " + brokerServiceUrlTls : "");
            LOG.info("messaging service is ready, bootstrap_seconds={}, {}, cluster={}, configs={}",
                    bootstrapTimeSeconds, bootstrapMessage, config.getClusterName(), config);

            state = State.Started;
        } catch (Exception e) {
            LOG.error("Failed to start Pulsar service: {}", e.getMessage(), e);
            PulsarServerException startException = PulsarServerException.from(e);
            readyForIncomingRequestsFuture.completeExceptionally(startException);
            throw startException;
        } finally {
            mutex.unlock();
        }
    }