def handleWriteTxnMarkersRequest()

in core/src/main/scala/kafka/server/KafkaApis.scala [1603:1774]


  def handleWriteTxnMarkersRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
    // We are checking for AlterCluster permissions first. If it is not present, we are authorizing cluster operation
    // The latter will throw an exception if it is denied.
    if (!authHelper.authorize(request.context, ALTER, CLUSTER, CLUSTER_NAME, logIfDenied = false)) {
      authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
    }
    val writeTxnMarkersRequest = request.body[WriteTxnMarkersRequest]
    val errors = new ConcurrentHashMap[java.lang.Long, util.Map[TopicPartition, Errors]]()
    val markers = writeTxnMarkersRequest.markers
    val numAppends = new AtomicInteger(markers.size)

    if (numAppends.get == 0) {
      requestHelper.sendResponseExemptThrottle(request, new WriteTxnMarkersResponse(errors))
      return
    }

    def updateErrors(producerId: Long, currentErrors: ConcurrentHashMap[TopicPartition, Errors]): Unit = {
      val previousErrors = errors.putIfAbsent(producerId, currentErrors)
      if (previousErrors != null)
        previousErrors.putAll(currentErrors)
    }

    /**
      * This is the call back invoked when a log append of transaction markers succeeds. This can be called multiple
      * times when handling a single WriteTxnMarkersRequest because there is one append per TransactionMarker in the
      * request, so there could be multiple appends of markers to the log. The final response will be sent only
      * after all appends have returned.
      */
    def maybeSendResponseCallback(producerId: Long, result: TransactionResult, currentErrors: ConcurrentHashMap[TopicPartition, Errors]): Unit = {
      trace(s"End transaction marker append for producer id $producerId completed with status: $currentErrors")
      updateErrors(producerId, currentErrors)

      def maybeSendResponse(): Unit = {
        if (numAppends.decrementAndGet() == 0) {
          requestHelper.sendResponseExemptThrottle(request, new WriteTxnMarkersResponse(errors))
        }
      }

      // The new group coordinator uses GroupCoordinator#completeTransaction so we do
      // not need to call GroupCoordinator#onTransactionCompleted here.
      if (config.isNewGroupCoordinatorEnabled) {
        maybeSendResponse()
        return
      }

      val successfulOffsetsPartitions = currentErrors.asScala.filter { case (topicPartition, error) =>
        topicPartition.topic == GROUP_METADATA_TOPIC_NAME && error == Errors.NONE
      }.keys

      // If no end transaction marker has been written to a __consumer_offsets partition, we do not
      // need to call GroupCoordinator#onTransactionCompleted.
      if (successfulOffsetsPartitions.isEmpty) {
        maybeSendResponse()
        return
      }

      // Otherwise, we call GroupCoordinator#onTransactionCompleted to materialize the offsets
      // into the cache and we wait until the meterialization is completed.
      groupCoordinator.onTransactionCompleted(producerId, successfulOffsetsPartitions.asJava, result).whenComplete { (_, exception) =>
        if (exception != null) {
          error(s"Received an exception while trying to update the offsets cache on transaction marker append", exception)
          val updatedErrors = new ConcurrentHashMap[TopicPartition, Errors]()
          successfulOffsetsPartitions.foreach(updatedErrors.put(_, Errors.UNKNOWN_SERVER_ERROR))
          updateErrors(producerId, updatedErrors)
        }
        maybeSendResponse()
      }
    }

    // TODO: The current append API makes doing separate writes per producerId a little easier, but it would
    // be nice to have only one append to the log. This requires pushing the building of the control records
    // into Log so that we only append those having a valid producer epoch, and exposing a new appendControlRecord
    // API in ReplicaManager. For now, we've done the simpler approach
    var skippedMarkers = 0
    for (marker <- markers.asScala) {
      val producerId = marker.producerId
      val partitionsWithCompatibleMessageFormat = new mutable.ArrayBuffer[TopicPartition]

      val currentErrors = new ConcurrentHashMap[TopicPartition, Errors]()
      marker.partitions.forEach { partition =>
        replicaManager.onlinePartition(partition) match {
          case Some(_)  =>
            partitionsWithCompatibleMessageFormat += partition
          case None =>
            currentErrors.put(partition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
        }
      }

      if (!currentErrors.isEmpty)
        updateErrors(producerId, currentErrors)

      if (partitionsWithCompatibleMessageFormat.isEmpty) {
        numAppends.decrementAndGet()
        skippedMarkers += 1
      } else {
        val controlRecordType = marker.transactionResult match {
          case TransactionResult.COMMIT => ControlRecordType.COMMIT
          case TransactionResult.ABORT => ControlRecordType.ABORT
        }

        val markerResults = new ConcurrentHashMap[TopicPartition, Errors]()
        val numPartitions = new AtomicInteger(partitionsWithCompatibleMessageFormat.size)
        def addResultAndMaybeComplete(partition: TopicPartition, error: Errors): Unit = {
          markerResults.put(partition, error)
          // We should only call maybeSendResponseCallback once per marker. Otherwise, it causes sending the response
          // prematurely.
          if (numPartitions.decrementAndGet() == 0) {
            maybeSendResponseCallback(producerId, marker.transactionResult, markerResults)
          }
        }

        val controlRecords = mutable.Map.empty[TopicPartition, MemoryRecords]
        partitionsWithCompatibleMessageFormat.foreach { partition =>
          if (groupCoordinator.isNewGroupCoordinator && partition.topic == GROUP_METADATA_TOPIC_NAME) {
            // When the new group coordinator is used, writing the end marker is fully delegated
            // to the group coordinator.
            groupCoordinator.completeTransaction(
              partition,
              marker.producerId,
              marker.producerEpoch,
              marker.coordinatorEpoch,
              marker.transactionResult,
              Duration.ofMillis(config.requestTimeoutMs.toLong)
            ).whenComplete { (_, exception) =>
              val error = if (exception == null) {
                Errors.NONE
              } else {
                Errors.forException(exception) match {
                  case Errors.COORDINATOR_NOT_AVAILABLE | Errors.COORDINATOR_LOAD_IN_PROGRESS | Errors.NOT_COORDINATOR =>
                    // The transaction coordinator does not expect those errors so we translate them
                    // to NOT_LEADER_OR_FOLLOWER to signal to it that the coordinator is not ready yet.
                    Errors.NOT_LEADER_OR_FOLLOWER
                  case error =>
                    error
                }
              }
              addResultAndMaybeComplete(partition, error)
            }
          } else {
            // Otherwise, the regular appendRecords path is used for all the non __consumer_offsets
            // partitions or for all partitions when the new group coordinator is disabled.
            controlRecords += partition -> MemoryRecords.withEndTransactionMarker(
              producerId,
              marker.producerEpoch,
              new EndTransactionMarker(controlRecordType, marker.coordinatorEpoch)
            )
          }
        }

        if (controlRecords.nonEmpty) {
          replicaManager.appendRecords(
            timeout = config.requestTimeoutMs.toLong,
            requiredAcks = -1,
            internalTopicsAllowed = true,
            origin = AppendOrigin.COORDINATOR,
            entriesPerPartition = controlRecords,
            requestLocal = requestLocal,
            responseCallback = errors => {
              errors.foreachEntry { (tp, partitionResponse) =>
                addResultAndMaybeComplete(tp, partitionResponse.error)
              }
            }
          )
        }
      }
    }

    // No log appends were written as all partitions had incorrect log format
    // so we need to send the error response
    if (skippedMarkers == markers.size)
      requestHelper.sendResponseExemptThrottle(request, new WriteTxnMarkersResponse(errors))
  }