override def name()

in core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala [99:237]


  override def name(): String = "BrokerMetadataPublisher"

  override def onMetadataUpdate(
    delta: MetadataDelta,
    newImage: MetadataImage,
    manifest: LoaderManifest
  ): Unit = {
    val highestOffsetAndEpoch = newImage.highestOffsetAndEpoch()

    val deltaName = if (_firstPublish) {
      s"initial MetadataDelta up to ${highestOffsetAndEpoch.offset}"
    } else {
      s"MetadataDelta up to ${highestOffsetAndEpoch.offset}"
    }
    try {
      if (isTraceEnabled) {
        trace(s"Publishing delta $delta with highest offset $highestOffsetAndEpoch")
      }

      // Publish the new metadata image to the metadata cache.
      metadataCache.setImage(newImage)

      val metadataVersionLogMsg = s"metadata.version ${newImage.features().metadataVersion()}"

      if (_firstPublish) {
        info(s"Publishing initial metadata at offset $highestOffsetAndEpoch with $metadataVersionLogMsg.")

        // If this is the first metadata update we are applying, initialize the managers
        // first (but after setting up the metadata cache).
        initializeManagers(newImage)
      } else if (isDebugEnabled) {
        debug(s"Publishing metadata at offset $highestOffsetAndEpoch with $metadataVersionLogMsg.")
      }

      // Apply topic deltas.
      Option(delta.topicsDelta()).foreach { topicsDelta =>
        try {
          // Notify the replica manager about changes to topics.
          replicaManager.applyDelta(topicsDelta, newImage)
        } catch {
          case t: Throwable => metadataPublishingFaultHandler.handleFault("Error applying topics " +
            s"delta in $deltaName", t)
        }
        try {
          // Update the group coordinator of local changes
          updateCoordinator(newImage,
            delta,
            Topic.GROUP_METADATA_TOPIC_NAME,
            groupCoordinator.onElection,
            (partitionIndex, leaderEpochOpt) => groupCoordinator.onResignation(partitionIndex, toOptionalInt(leaderEpochOpt))
          )
        } catch {
          case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating group " +
            s"coordinator with local changes in $deltaName", t)
        }
        try {
          // Update the transaction coordinator of local changes
          updateCoordinator(newImage,
            delta,
            Topic.TRANSACTION_STATE_TOPIC_NAME,
            txnCoordinator.onElection,
            txnCoordinator.onResignation)
        } catch {
          case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating txn " +
            s"coordinator with local changes in $deltaName", t)
        }
        if (shareCoordinator.isDefined) {
          try {
            updateCoordinator(newImage,
              delta,
              Topic.SHARE_GROUP_STATE_TOPIC_NAME,
              shareCoordinator.get.onElection,
              (partitionIndex, leaderEpochOpt) => shareCoordinator.get.onResignation(partitionIndex, toOptionalInt(leaderEpochOpt))
            )
          } catch {
            case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating share " +
              s"coordinator with local changes in $deltaName", t)
          }
        }
        try {
          // Notify the group coordinator about deleted topics.
          val deletedTopicPartitions = new mutable.ArrayBuffer[TopicPartition]()
          topicsDelta.deletedTopicIds().forEach { id =>
            val topicImage = topicsDelta.image().getTopic(id)
            topicImage.partitions().keySet().forEach {
              id => deletedTopicPartitions += new TopicPartition(topicImage.name(), id)
            }
          }
          if (deletedTopicPartitions.nonEmpty) {
            groupCoordinator.onPartitionsDeleted(deletedTopicPartitions.asJava, RequestLocal.noCaching.bufferSupplier)
          }
        } catch {
          case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating group " +
            s"coordinator with deleted partitions in $deltaName", t)
        }
      }

      // Apply configuration deltas.
      dynamicConfigPublisher.onMetadataUpdate(delta, newImage)

      // Apply client quotas delta.
      dynamicClientQuotaPublisher.onMetadataUpdate(delta, newImage)

      // Apply SCRAM delta.
      scramPublisher.onMetadataUpdate(delta, newImage)

      // Apply DelegationToken delta.
      delegationTokenPublisher.onMetadataUpdate(delta, newImage)

      // Apply ACL delta.
      aclPublisher.onMetadataUpdate(delta, newImage, manifest)

      try {
        // Propagate the new image to the group coordinator.
        groupCoordinator.onNewMetadataImage(newImage, delta)
      } catch {
        case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating group " +
          s"coordinator with local changes in $deltaName", t)
      }

      try {
        // Propagate the new image to the share coordinator.
        shareCoordinator.foreach(coordinator => coordinator.onNewMetadataImage(newImage, delta))
      } catch {
        case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating share " +
          s"coordinator with local changes in $deltaName", t)
      }

      if (_firstPublish) {
        finishInitializingReplicaManager()
      }
    } catch {
      case t: Throwable => metadataPublishingFaultHandler.handleFault("Uncaught exception while " +
        s"publishing broker metadata from $deltaName", t)
    } finally {
      _firstPublish = false
      firstPublishFuture.complete(null)
    }
  }