in core/src/main/scala/kafka/server/BrokerServer.scala [188:602]
override def startup(): Unit = {
if (!maybeChangeStatus(SHUTDOWN, STARTING)) return
val startupDeadline = Deadline.fromDelay(time, config.serverMaxStartupTimeMs, TimeUnit.MILLISECONDS)
try {
sharedServer.startForBroker()
info("Starting broker")
val clientMetricsReceiverPlugin = new ClientMetricsReceiverPlugin()
config.dynamicConfig.initialize(Some(clientMetricsReceiverPlugin))
quotaManagers = QuotaFactory.instantiate(config, metrics, time, s"broker-${config.nodeId}-", ProcessRole.BrokerRole.toString)
DynamicBrokerConfig.readDynamicBrokerConfigsFromSnapshot(raftManager, config, quotaManagers, logContext)
/* start scheduler */
kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
kafkaScheduler.startup()
/* register broker metrics */
brokerTopicStats = new BrokerTopicStats(config.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
metadataCache = new KRaftMetadataCache(config.nodeId, () => raftManager.client.kraftVersion())
// Create log manager, but don't start it because we need to delay any potential unclean shutdown log recovery
// until we catch up on the metadata log and have up-to-date topic and broker configs.
logManager = LogManager(config,
sharedServer.metaPropsEnsemble.errorLogDirs().asScala.toSeq,
metadataCache,
kafkaScheduler,
time,
brokerTopicStats,
logDirFailureChannel)
lifecycleManager = new BrokerLifecycleManager(config,
time,
s"broker-${config.nodeId}-",
logDirs = logManager.directoryIdsSet,
() => new Thread(() => shutdown(), "kafka-shutdown-thread").start())
// Enable delegation token cache for all SCRAM mechanisms to simplify dynamic update.
// This keeps the cache up-to-date if new SCRAM mechanisms are enabled dynamically.
tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
FutureUtils.waitWithLogging(logger.underlying, logIdent,
"controller quorum voters future",
sharedServer.controllerQuorumVotersFuture,
startupDeadline, time)
val controllerNodeProvider = RaftControllerNodeProvider(raftManager, config)
clientToControllerChannelManager = new NodeToControllerChannelManagerImpl(
controllerNodeProvider,
time,
metrics,
config,
channelName = "forwarding",
s"broker-${config.nodeId}-",
retryTimeoutMs = 60000
)
clientToControllerChannelManager.start()
forwardingManager = new ForwardingManagerImpl(clientToControllerChannelManager, metrics)
clientMetricsManager = new ClientMetricsManager(clientMetricsReceiverPlugin, config.clientTelemetryMaxBytes, time, metrics)
val apiVersionManager = new DefaultApiVersionManager(
ListenerType.BROKER,
() => forwardingManager.controllerApiVersions,
brokerFeatures,
metadataCache,
config.unstableApiVersionsEnabled,
Optional.of(clientMetricsManager)
)
val shareFetchSessionCache : ShareSessionCache = new ShareSessionCache(config.shareGroupConfig.shareGroupMaxShareSessions())
val connectionDisconnectListeners = Seq(
clientMetricsManager.connectionDisconnectListener(),
shareFetchSessionCache.connectionDisconnectListener()
)
// Create and start the socket server acceptor threads so that the bound port is known.
// Delay starting processors until the end of the initialization sequence to ensure
// that credentials have been loaded before processing authentications.
socketServer = new SocketServer(config,
metrics,
time,
credentialProvider,
apiVersionManager,
sharedServer.socketFactory,
connectionDisconnectListeners)
clientQuotaMetadataManager = new ClientQuotaMetadataManager(quotaManagers, socketServer.connectionQuotas)
val listenerInfo = ListenerInfo.create(Optional.of(config.interBrokerListenerName.value()),
config.effectiveAdvertisedBrokerListeners.asJava).
withWildcardHostnamesResolved().
withEphemeralPortsCorrected(name => socketServer.boundPort(new ListenerName(name)))
remoteLogManagerOpt = createRemoteLogManager(listenerInfo)
alterPartitionManager = AlterPartitionManager(
config,
scheduler = kafkaScheduler,
controllerNodeProvider,
time = time,
metrics,
s"broker-${config.nodeId}-",
brokerEpochSupplier = () => lifecycleManager.brokerEpoch
)
alterPartitionManager.start()
val addPartitionsLogContext = new LogContext(s"[AddPartitionsToTxnManager broker=${config.brokerId}]")
val addPartitionsToTxnNetworkClient = NetworkUtils.buildNetworkClient("AddPartitionsManager", config, metrics, time, addPartitionsLogContext)
val addPartitionsToTxnManager = new AddPartitionsToTxnManager(
config,
addPartitionsToTxnNetworkClient,
metadataCache,
// The transaction coordinator is not created at this point so we must
// use a lambda here.
transactionalId => transactionCoordinator.partitionFor(transactionalId),
time
)
val assignmentsChannelManager = new NodeToControllerChannelManagerImpl(
controllerNodeProvider,
time,
metrics,
config,
"directory-assignments",
s"broker-${config.nodeId}-",
retryTimeoutMs = 60000
)
assignmentsManager = new AssignmentsManager(
time,
assignmentsChannelManager,
config.brokerId,
() => metadataCache.getImage(),
(directoryId: Uuid) => logManager.directoryPath(directoryId).
getOrElse("[unknown directory path]")
)
val directoryEventHandler = new DirectoryEventHandler {
override def handleAssignment(partition: TopicIdPartition, directoryId: Uuid, reason: String, callback: Runnable): Unit =
assignmentsManager.onAssignment(partition, directoryId, reason, callback)
override def handleFailure(directoryId: Uuid): Unit =
lifecycleManager.propagateDirectoryFailure(directoryId, config.logDirFailureTimeoutMs)
}
/**
* TODO: move this action queue to handle thread so we can simplify concurrency handling
*/
val defaultActionQueue = new DelayedActionQueue
this._replicaManager = new ReplicaManager(
config = config,
metrics = metrics,
time = time,
scheduler = kafkaScheduler,
logManager = logManager,
remoteLogManager = remoteLogManagerOpt,
quotaManagers = quotaManagers,
metadataCache = metadataCache,
logDirFailureChannel = logDirFailureChannel,
alterPartitionManager = alterPartitionManager,
brokerTopicStats = brokerTopicStats,
isShuttingDown = isShuttingDown,
threadNamePrefix = None, // The ReplicaManager only runs on the broker, and already includes the ID in thread names.
delayedRemoteFetchPurgatoryParam = None,
brokerEpochSupplier = () => lifecycleManager.brokerEpoch,
addPartitionsToTxnManager = Some(addPartitionsToTxnManager),
directoryEventHandler = directoryEventHandler,
defaultActionQueue = defaultActionQueue
)
/* start token manager */
tokenManager = new DelegationTokenManager(new DelegationTokenManagerConfigs(config), tokenCache)
// Create and initialize an authorizer if one is configured.
authorizerPlugin = config.createNewAuthorizer(metrics, ProcessRole.BrokerRole.toString)
/* initializing the groupConfigManager */
groupConfigManager = new GroupConfigManager(config.groupCoordinatorConfig.extractGroupConfigMap(config.shareGroupConfig))
/* create share coordinator */
shareCoordinator = createShareCoordinator()
/* create persister */
persister = createShareStatePersister()
groupCoordinator = createGroupCoordinator()
val producerIdManagerSupplier = () => ProducerIdManager.rpc(
config.brokerId,
time,
() => lifecycleManager.brokerEpoch,
clientToControllerChannelManager
)
// Create transaction coordinator, but don't start it until we've started replica manager.
// Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
transactionCoordinator = TransactionCoordinator(config, replicaManager,
new KafkaScheduler(1, true, "transaction-log-manager-"),
producerIdManagerSupplier, metrics, metadataCache, Time.SYSTEM)
autoTopicCreationManager = new DefaultAutoTopicCreationManager(
config, clientToControllerChannelManager, groupCoordinator,
transactionCoordinator, shareCoordinator)
dynamicConfigHandlers = Map[ConfigType, ConfigHandler](
ConfigType.TOPIC -> new TopicConfigHandler(replicaManager, config, quotaManagers),
ConfigType.BROKER -> new BrokerConfigHandler(config, quotaManagers),
ConfigType.CLIENT_METRICS -> new ClientMetricsConfigHandler(clientMetricsManager),
ConfigType.GROUP -> new GroupConfigHandler(groupCoordinator))
val featuresRemapped = BrokerFeatures.createDefaultFeatureMap(brokerFeatures)
val brokerLifecycleChannelManager = new NodeToControllerChannelManagerImpl(
controllerNodeProvider,
time,
metrics,
config,
"heartbeat",
s"broker-${config.nodeId}-",
config.brokerSessionTimeoutMs / 2 // KAFKA-14392
)
lifecycleManager.start(
() => sharedServer.loader.lastAppliedOffset(),
brokerLifecycleChannelManager,
clusterId,
listenerInfo.toBrokerRegistrationRequest,
featuresRemapped,
logManager.readBrokerEpochFromCleanShutdownFiles()
)
// The FetchSessionCache is divided into config.numIoThreads shards, each responsible
// for Math.max(1, shardNum * sessionIdRange) <= sessionId < (shardNum + 1) * sessionIdRange
val sessionIdRange = Int.MaxValue / NumFetchSessionCacheShards
val fetchSessionCacheShards = (0 until NumFetchSessionCacheShards)
.map(shardNum => new FetchSessionCacheShard(
config.maxIncrementalFetchSessionCacheSlots / NumFetchSessionCacheShards,
KafkaBroker.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS,
sessionIdRange,
shardNum
))
val fetchManager = new FetchManager(Time.SYSTEM, new FetchSessionCache(fetchSessionCacheShards))
sharePartitionManager = new SharePartitionManager(
replicaManager,
time,
shareFetchSessionCache,
config.shareGroupConfig.shareGroupRecordLockDurationMs,
config.shareGroupConfig.shareGroupDeliveryCountLimit,
config.shareGroupConfig.shareGroupPartitionMaxRecordLocks,
persister,
groupConfigManager,
brokerTopicStats
)
dataPlaneRequestProcessor = new KafkaApis(
requestChannel = socketServer.dataPlaneRequestChannel,
forwardingManager = forwardingManager,
replicaManager = replicaManager,
groupCoordinator = groupCoordinator,
txnCoordinator = transactionCoordinator,
shareCoordinator = shareCoordinator,
autoTopicCreationManager = autoTopicCreationManager,
brokerId = config.nodeId,
config = config,
configRepository = metadataCache,
metadataCache = metadataCache,
metrics = metrics,
authorizerPlugin = authorizerPlugin,
quotas = quotaManagers,
fetchManager = fetchManager,
sharePartitionManager = sharePartitionManager,
brokerTopicStats = brokerTopicStats,
clusterId = clusterId,
time = time,
tokenManager = tokenManager,
apiVersionManager = apiVersionManager,
clientMetricsManager = clientMetricsManager,
groupConfigManager = groupConfigManager)
dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId,
socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
config.numIoThreads, "RequestHandlerAvgIdlePercent")
metadataPublishers.add(new MetadataVersionConfigValidator(config, sharedServer.metadataPublishingFaultHandler))
brokerMetadataPublisher = new BrokerMetadataPublisher(config,
metadataCache,
logManager,
replicaManager,
groupCoordinator,
transactionCoordinator,
shareCoordinator,
new DynamicConfigPublisher(
config,
sharedServer.metadataPublishingFaultHandler,
dynamicConfigHandlers.toMap,
"broker"),
new DynamicClientQuotaPublisher(
config,
sharedServer.metadataPublishingFaultHandler,
"broker",
clientQuotaMetadataManager,
),
new DynamicTopicClusterQuotaPublisher(
clusterId,
config,
sharedServer.metadataPublishingFaultHandler,
"broker",
quotaManagers,
),
new ScramPublisher(
config,
sharedServer.metadataPublishingFaultHandler,
"broker",
credentialProvider),
new DelegationTokenPublisher(
config,
sharedServer.metadataPublishingFaultHandler,
"broker",
tokenManager),
new AclPublisher(
config.nodeId,
sharedServer.metadataPublishingFaultHandler,
"broker",
authorizerPlugin.toJava
),
sharedServer.initialBrokerMetadataLoadFaultHandler,
sharedServer.metadataPublishingFaultHandler
)
// If the BrokerLifecycleManager's initial catch-up future fails, it means we timed out
// or are shutting down before we could catch up. Therefore, also fail the firstPublishFuture.
lifecycleManager.initialCatchUpFuture.whenComplete((_, e) => {
if (e != null) brokerMetadataPublisher.firstPublishFuture.completeExceptionally(e)
})
metadataPublishers.add(brokerMetadataPublisher)
brokerRegistrationTracker = new BrokerRegistrationTracker(config.brokerId,
() => lifecycleManager.resendBrokerRegistration())
metadataPublishers.add(brokerRegistrationTracker)
// Register parts of the broker that can be reconfigured via dynamic configs. This needs to
// be done before we publish the dynamic configs, so that we don't miss anything.
config.dynamicConfig.addReconfigurables(this)
// Install all the metadata publishers.
FutureUtils.waitWithLogging(logger.underlying, logIdent,
"the broker metadata publishers to be installed",
sharedServer.loader.installPublishers(metadataPublishers), startupDeadline, time)
// Wait for this broker to contact the quorum, and for the active controller to acknowledge
// us as caught up. It will do this by returning a heartbeat response with isCaughtUp set to
// true. The BrokerLifecycleManager tracks this.
FutureUtils.waitWithLogging(logger.underlying, logIdent,
"the controller to acknowledge that we are caught up",
lifecycleManager.initialCatchUpFuture, startupDeadline, time)
// Wait for the first metadata update to be published. Metadata updates are not published
// until we read at least up to the high water mark of the cluster metadata partition.
// Usually, we publish the initial metadata before lifecycleManager.initialCatchUpFuture
// is completed, so this check is not necessary. But this is a simple check to make
// completely sure.
FutureUtils.waitWithLogging(logger.underlying, logIdent,
"the initial broker metadata update to be published",
brokerMetadataPublisher.firstPublishFuture , startupDeadline, time)
// Now that we have loaded some metadata, we can log a reasonably up-to-date broker
// configuration. Keep in mind that KafkaConfig.originals is a mutable field that gets set
// by the dynamic configuration publisher. Ironically, KafkaConfig.originals does not
// contain the original configuration values.
new KafkaConfig(config.originals(), true)
// We're now ready to unfence the broker. This also allows this broker to transition
// from RECOVERY state to RUNNING state, once the controller unfences the broker.
FutureUtils.waitWithLogging(logger.underlying, logIdent,
"the broker to be unfenced",
lifecycleManager.setReadyToUnfence(), startupDeadline, time)
// Enable inbound TCP connections. Each endpoint will be started only once its matching
// authorizer future is completed.
val endpointReadyFutures = {
val builder = new EndpointReadyFutures.Builder()
builder.build(authorizerPlugin.toJava,
new KafkaAuthorizerServerInfo(
new ClusterResource(clusterId),
config.nodeId,
listenerInfo.listeners().values(),
listenerInfo.firstListener(),
config.earlyStartListeners.map(_.value()).asJava))
}
val authorizerFutures = endpointReadyFutures.futures().asScala.toMap
val enableRequestProcessingFuture = socketServer.enableRequestProcessing(authorizerFutures)
// Block here until all the authorizer futures are complete.
FutureUtils.waitWithLogging(logger.underlying, logIdent,
"all of the authorizer futures to be completed",
CompletableFuture.allOf(authorizerFutures.values.toSeq: _*), startupDeadline, time)
// Wait for all the SocketServer ports to be open, and the Acceptors to be started.
FutureUtils.waitWithLogging(logger.underlying, logIdent,
"all of the SocketServer Acceptors to be started",
enableRequestProcessingFuture, startupDeadline, time)
maybeChangeStatus(STARTING, STARTED)
} catch {
case e: Throwable =>
maybeChangeStatus(STARTING, STARTED)
fatal("Fatal error during broker startup. Prepare to shutdown", e)
shutdown()
throw if (e.isInstanceOf[ExecutionException]) e.getCause else e
}
}