in pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java [797:1067]
public void start() throws PulsarServerException {
LOG.info("Starting Pulsar Broker service; version: '{}'",
(brokerVersion != null ? brokerVersion : "unknown"));
LOG.info("Git Revision {}", PulsarVersion.getGitSha());
LOG.info("Git Branch {}", PulsarVersion.getGitBranch());
LOG.info("Built by {} on {} at {}",
PulsarVersion.getBuildUser(),
PulsarVersion.getBuildHost(),
PulsarVersion.getBuildTime());
long startTimestamp = System.currentTimeMillis(); // start time mills
mutex.lock();
try {
if (state != State.Init) {
throw new PulsarServerException("Cannot start the service once it was stopped");
}
if (config.getWebServicePort().isEmpty() && config.getWebServicePortTls().isEmpty()) {
throw new IllegalArgumentException("webServicePort/webServicePortTls must be present");
}
if (config.isAuthorizationEnabled() && !config.isAuthenticationEnabled()) {
throw new IllegalStateException("Invalid broker configuration. Authentication must be enabled with "
+ "authenticationEnabled=true when authorization is enabled with authorizationEnabled=true.");
}
if (config.getDefaultRetentionSizeInMB() > 0
&& config.getBacklogQuotaDefaultLimitBytes() > 0
&& config.getBacklogQuotaDefaultLimitBytes()
>= (config.getDefaultRetentionSizeInMB() * 1024L * 1024L)) {
throw new IllegalArgumentException(String.format("The retention size must > the backlog quota limit "
+ "size, but the configured backlog quota limit bytes is %d, the retention size is %d",
config.getBacklogQuotaDefaultLimitBytes(),
config.getDefaultRetentionSizeInMB() * 1024L * 1024L));
}
if (config.getDefaultRetentionTimeInMinutes() > 0
&& config.getBacklogQuotaDefaultLimitSecond() > 0
&& config.getBacklogQuotaDefaultLimitSecond() >= config.getDefaultRetentionTimeInMinutes() * 60) {
throw new IllegalArgumentException(String.format("The retention time must > the backlog quota limit "
+ "time, but the configured backlog quota limit time duration is %d, "
+ "the retention time duration is %d",
config.getBacklogQuotaDefaultLimitSecond(),
config.getDefaultRetentionTimeInMinutes() * 60));
}
openTelemetryTopicStats = new OpenTelemetryTopicStats(this);
openTelemetryConsumerStats = new OpenTelemetryConsumerStats(this);
openTelemetryProducerStats = new OpenTelemetryProducerStats(this);
openTelemetryReplicatorStats = new OpenTelemetryReplicatorStats(this);
openTelemetryReplicatedSubscriptionStats = new OpenTelemetryReplicatedSubscriptionStats(this);
localMetadataSynchronizer = StringUtils.isNotBlank(config.getMetadataSyncEventTopic())
? new PulsarMetadataEventSynchronizer(this, config.getMetadataSyncEventTopic())
: null;
localMetadataStore = createLocalMetadataStore(localMetadataSynchronizer,
openTelemetry.getOpenTelemetryService().getOpenTelemetry());
localMetadataStore.registerSessionListener(this::handleMetadataSessionEvent);
coordinationService = new CoordinationServiceImpl(localMetadataStore);
if (config.isConfigurationStoreSeparated()) {
configMetadataSynchronizer = StringUtils.isNotBlank(config.getConfigurationMetadataSyncEventTopic())
? new PulsarMetadataEventSynchronizer(this, config.getConfigurationMetadataSyncEventTopic())
: null;
configurationMetadataStore = createConfigurationMetadataStore(configMetadataSynchronizer,
openTelemetry.getOpenTelemetryService().getOpenTelemetry());
shouldShutdownConfigurationMetadataStore = true;
} else {
configurationMetadataStore = localMetadataStore;
shouldShutdownConfigurationMetadataStore = false;
}
pulsarResources = newPulsarResources();
orderedExecutor = newOrderedExecutor();
// Initialize the message protocol handlers
protocolHandlers = ProtocolHandlers.load(config);
protocolHandlers.initialize(config);
// Now we are ready to start services
this.bkClientFactory = newBookKeeperClientFactory();
managedLedgerStorage = newManagedLedgerStorage();
this.brokerService = newBrokerService(this);
// Start load management service (even if load balancing is disabled)
this.loadManager.set(LoadManager.create(this));
// needs load management service and before start broker service,
this.startNamespaceService();
schemaStorage = createAndStartSchemaStorage();
schemaRegistryService = SchemaRegistryService.create(
schemaStorage, config.getSchemaRegistryCompatibilityCheckers(), this);
OffloadPoliciesImpl defaultOffloadPolicies =
OffloadPoliciesImpl.create(this.getConfiguration().getProperties());
OrderedScheduler offloaderScheduler = getOffloaderScheduler(defaultOffloadPolicies);
int interval = config.getManagedLedgerStatsPeriodSeconds();
boolean exposeTopicMetrics = config.isExposeTopicLevelMetricsInPrometheus();
offloaderStats = LedgerOffloaderStats.create(config.isExposeManagedLedgerMetricsInPrometheus(),
exposeTopicMetrics, offloaderScheduler, interval);
this.defaultOffloader = createManagedLedgerOffloader(defaultOffloadPolicies);
setBrokerInterceptor(newBrokerInterceptor());
// use getter to support mocking getBrokerInterceptor method in tests
BrokerInterceptor interceptor = getBrokerInterceptor();
if (interceptor != null) {
brokerService.setInterceptor(interceptor);
interceptor.initialize(this);
}
brokerService.start();
// Load additional servlets
this.brokerAdditionalServlets = AdditionalServlets.load(config);
this.webService = new WebService(this);
createMetricsServlet();
this.addWebServerHandlers(webService, metricsServlet, this.config);
this.webService.start();
// Refresh addresses and update configuration, since the port might have been dynamically assigned
if (config.getBrokerServicePort().equals(Optional.of(0))) {
config.setBrokerServicePort(brokerService.getListenPort());
}
if (config.getBrokerServicePortTls().equals(Optional.of(0))) {
config.setBrokerServicePortTls(brokerService.getListenPortTls());
}
this.webServiceAddress = webAddress(config);
this.webServiceAddressTls = webAddressTls(config);
this.brokerServiceUrl = brokerUrl(config);
this.brokerServiceUrlTls = brokerUrlTls(config);
// the broker id is used in the load manager to identify the broker
// it should not be used for making connections to the broker
this.brokerId =
String.format("%s:%s", advertisedAddress, config.getWebServicePort()
.or(config::getWebServicePortTls).orElseThrow());
if (this.compactionServiceFactory == null) {
this.compactionServiceFactory = loadCompactionServiceFactory();
}
if (null != this.webSocketService) {
ClusterDataImpl clusterData = ClusterDataImpl.builder()
.serviceUrl(webServiceAddress)
.serviceUrlTls(webServiceAddressTls)
.brokerServiceUrl(brokerServiceUrl)
.brokerServiceUrlTls(brokerServiceUrlTls)
.build();
this.webSocketService.setLocalCluster(clusterData);
}
// Initialize namespace service, after service url assigned. Should init zk and refresh self owner info.
this.nsService.initialize();
// Start the leader election service
startLeaderElectionService();
// By starting the Load manager service, the broker will also become visible
// to the rest of the broker by creating the registration z-node. This needs
// to be done only when the broker is fully operative.
//
// The load manager service and its service unit state channel need to be initialized first
// (namespace service depends on load manager)
this.startLoadManagementService();
// Start topic level policies service
this.topicPoliciesService = initTopicPoliciesService();
this.topicPoliciesService.start(this);
// Register heartbeat and bootstrap namespaces.
this.nsService.registerBootstrapNamespaces();
// Register pulsar system namespaces and start transaction meta store service
if (config.isTransactionCoordinatorEnabled()) {
MLTransactionMetadataStoreProvider.initBufferedWriterMetrics(getAdvertisedAddress());
MLPendingAckStoreProvider.initBufferedWriterMetrics(getAdvertisedAddress());
this.transactionBufferSnapshotServiceFactory = new TransactionBufferSnapshotServiceFactory(this);
this.transactionTimer =
new HashedWheelTimer(new DefaultThreadFactory("pulsar-transaction-timer"));
transactionBufferClient = TransactionBufferClientImpl.create(this, transactionTimer,
config.getTransactionBufferClientMaxConcurrentRequests(),
config.getTransactionBufferClientOperationTimeoutInMills());
transactionMetadataStoreService = new TransactionMetadataStoreService(TransactionMetadataStoreProvider
.newProvider(config.getTransactionMetadataStoreProviderClassName()), this,
transactionBufferClient, transactionTimer);
transactionBufferProvider = TransactionBufferProvider
.newProvider(config.getTransactionBufferProviderClassName());
transactionPendingAckStoreProvider = TransactionPendingAckStoreProvider
.newProvider(config.getTransactionPendingAckStoreProviderClassName());
openTelemetryTransactionCoordinatorStats = new OpenTelemetryTransactionCoordinatorStats(this);
openTelemetryTransactionPendingAckStoreStats = new OpenTelemetryTransactionPendingAckStoreStats(this);
}
this.metricsGenerator = new MetricsGenerator(this);
// the broker is ready to accept incoming requests by Pulsar binary protocol and http/https
final List<Runnable> runnables;
synchronized (pendingTasksBeforeReadyForIncomingRequests) {
runnables = new ArrayList<>(pendingTasksBeforeReadyForIncomingRequests);
pendingTasksBeforeReadyForIncomingRequests.clear();
readyForIncomingRequestsFuture.complete(null);
}
runnables.forEach(Runnable::run);
// Initialize the message protocol handlers.
// start the protocol handlers only after the broker is ready,
// so that the protocol handlers can access broker service properly.
this.protocolHandlers.start(brokerService);
Map<String, Map<InetSocketAddress, ChannelInitializer<SocketChannel>>> protocolHandlerChannelInitializers =
this.protocolHandlers.newChannelInitializers();
this.brokerService.startProtocolHandlers(protocolHandlerChannelInitializers);
acquireSLANamespace();
// start function worker service if necessary
this.startWorkerService(brokerService.getAuthenticationService(), brokerService.getAuthorizationService());
// start packages management service if necessary
if (config.isEnablePackagesManagement()) {
this.startPackagesManagementService();
}
// Start the task to publish resource usage, if necessary
this.resourceUsageTransportManager = DISABLE_RESOURCE_USAGE_TRANSPORT_MANAGER;
if (isNotBlank(config.getResourceUsageTransportClassName())) {
Class<?> clazz = Class.forName(config.getResourceUsageTransportClassName());
Constructor<?> ctor = clazz.getConstructor(PulsarService.class);
Object object = ctor.newInstance(this);
this.resourceUsageTransportManager = (ResourceUsageTopicTransportManager) object;
}
this.resourceGroupServiceManager = new ResourceGroupService(this);
if (localMetadataSynchronizer != null) {
localMetadataSynchronizer.start();
}
if (configMetadataSynchronizer != null) {
configMetadataSynchronizer.start();
}
long currentTimestamp = System.currentTimeMillis();
final long bootstrapTimeSeconds = TimeUnit.MILLISECONDS.toSeconds(currentTimestamp - startTimestamp);
final String bootstrapMessage = "bootstrap service "
+ (config.getWebServicePort().isPresent() ? "port = " + config.getWebServicePort().get() : "")
+ (config.getWebServicePortTls().isPresent() ? ", tls-port = " + config.getWebServicePortTls() : "")
+ (StringUtils.isNotEmpty(brokerServiceUrl) ? ", broker url= " + brokerServiceUrl : "")
+ (StringUtils.isNotEmpty(brokerServiceUrlTls) ? ", broker tls url= " + brokerServiceUrlTls : "");
LOG.info("messaging service is ready, bootstrap_seconds={}, {}, cluster={}, configs={}",
bootstrapTimeSeconds, bootstrapMessage, config.getClusterName(), config);
state = State.Started;
} catch (Exception e) {
LOG.error("Failed to start Pulsar service: {}", e.getMessage(), e);
PulsarServerException startException = PulsarServerException.from(e);
readyForIncomingRequestsFuture.completeExceptionally(startException);
throw startException;
} finally {
mutex.unlock();
}
}