in core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala [99:237]
override def name(): String = "BrokerMetadataPublisher"
override def onMetadataUpdate(
delta: MetadataDelta,
newImage: MetadataImage,
manifest: LoaderManifest
): Unit = {
val highestOffsetAndEpoch = newImage.highestOffsetAndEpoch()
val deltaName = if (_firstPublish) {
s"initial MetadataDelta up to ${highestOffsetAndEpoch.offset}"
} else {
s"MetadataDelta up to ${highestOffsetAndEpoch.offset}"
}
try {
if (isTraceEnabled) {
trace(s"Publishing delta $delta with highest offset $highestOffsetAndEpoch")
}
// Publish the new metadata image to the metadata cache.
metadataCache.setImage(newImage)
val metadataVersionLogMsg = s"metadata.version ${newImage.features().metadataVersion()}"
if (_firstPublish) {
info(s"Publishing initial metadata at offset $highestOffsetAndEpoch with $metadataVersionLogMsg.")
// If this is the first metadata update we are applying, initialize the managers
// first (but after setting up the metadata cache).
initializeManagers(newImage)
} else if (isDebugEnabled) {
debug(s"Publishing metadata at offset $highestOffsetAndEpoch with $metadataVersionLogMsg.")
}
// Apply topic deltas.
Option(delta.topicsDelta()).foreach { topicsDelta =>
try {
// Notify the replica manager about changes to topics.
replicaManager.applyDelta(topicsDelta, newImage)
} catch {
case t: Throwable => metadataPublishingFaultHandler.handleFault("Error applying topics " +
s"delta in $deltaName", t)
}
try {
// Update the group coordinator of local changes
updateCoordinator(newImage,
delta,
Topic.GROUP_METADATA_TOPIC_NAME,
groupCoordinator.onElection,
(partitionIndex, leaderEpochOpt) => groupCoordinator.onResignation(partitionIndex, toOptionalInt(leaderEpochOpt))
)
} catch {
case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating group " +
s"coordinator with local changes in $deltaName", t)
}
try {
// Update the transaction coordinator of local changes
updateCoordinator(newImage,
delta,
Topic.TRANSACTION_STATE_TOPIC_NAME,
txnCoordinator.onElection,
txnCoordinator.onResignation)
} catch {
case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating txn " +
s"coordinator with local changes in $deltaName", t)
}
if (shareCoordinator.isDefined) {
try {
updateCoordinator(newImage,
delta,
Topic.SHARE_GROUP_STATE_TOPIC_NAME,
shareCoordinator.get.onElection,
(partitionIndex, leaderEpochOpt) => shareCoordinator.get.onResignation(partitionIndex, toOptionalInt(leaderEpochOpt))
)
} catch {
case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating share " +
s"coordinator with local changes in $deltaName", t)
}
}
try {
// Notify the group coordinator about deleted topics.
val deletedTopicPartitions = new mutable.ArrayBuffer[TopicPartition]()
topicsDelta.deletedTopicIds().forEach { id =>
val topicImage = topicsDelta.image().getTopic(id)
topicImage.partitions().keySet().forEach {
id => deletedTopicPartitions += new TopicPartition(topicImage.name(), id)
}
}
if (deletedTopicPartitions.nonEmpty) {
groupCoordinator.onPartitionsDeleted(deletedTopicPartitions.asJava, RequestLocal.noCaching.bufferSupplier)
}
} catch {
case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating group " +
s"coordinator with deleted partitions in $deltaName", t)
}
}
// Apply configuration deltas.
dynamicConfigPublisher.onMetadataUpdate(delta, newImage)
// Apply client quotas delta.
dynamicClientQuotaPublisher.onMetadataUpdate(delta, newImage)
// Apply SCRAM delta.
scramPublisher.onMetadataUpdate(delta, newImage)
// Apply DelegationToken delta.
delegationTokenPublisher.onMetadataUpdate(delta, newImage)
// Apply ACL delta.
aclPublisher.onMetadataUpdate(delta, newImage, manifest)
try {
// Propagate the new image to the group coordinator.
groupCoordinator.onNewMetadataImage(newImage, delta)
} catch {
case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating group " +
s"coordinator with local changes in $deltaName", t)
}
try {
// Propagate the new image to the share coordinator.
shareCoordinator.foreach(coordinator => coordinator.onNewMetadataImage(newImage, delta))
} catch {
case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating share " +
s"coordinator with local changes in $deltaName", t)
}
if (_firstPublish) {
finishInitializingReplicaManager()
}
} catch {
case t: Throwable => metadataPublishingFaultHandler.handleFault("Uncaught exception while " +
s"publishing broker metadata from $deltaName", t)
} finally {
_firstPublish = false
firstPublishFuture.complete(null)
}
}