def appendTransactionToLog()

in core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala [463:627]


  def appendTransactionToLog(transactionalId: String,
                             coordinatorEpoch: Int,
                             newMetadata: TxnTransitMetadata,
                             responseCallback: Errors => Unit,
                             retryOnError: Errors => Boolean = _ => false): Unit = {

    // generate the message for this transaction metadata
    val keyBytes = TransactionLog.keyToBytes(transactionalId)
    val valueBytes = TransactionLog.valueToBytes(newMetadata)
    val timestamp = time.milliseconds()

    val records = MemoryRecords.withRecords(TransactionLog.EnforcedCompressionType, new SimpleRecord(timestamp, keyBytes, valueBytes))
    val topicPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionFor(transactionalId))
    val recordsPerPartition = Map(topicPartition -> records)

    // set the callback function to update transaction status in cache after log append completed
    def updateCacheCallback(responseStatus: collection.Map[TopicPartition, PartitionResponse]): Unit = {
      // the append response should only contain the topics partition
      if (responseStatus.size != 1 || !responseStatus.contains(topicPartition))
        throw new IllegalStateException("Append status %s should only have one partition %s"
          .format(responseStatus, topicPartition))

      val status = responseStatus(topicPartition)

      var responseError = if (status.error == Errors.NONE) {
        Errors.NONE
      } else {
        debug(s"Appending $transactionalId's new metadata $newMetadata failed due to ${status.error.exceptionName}")

        // transform the log append error code to the corresponding coordinator error code
        status.error match {
          case Errors.UNKNOWN_TOPIC_OR_PARTITION
               | Errors.NOT_ENOUGH_REPLICAS
               | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND
               | Errors.REQUEST_TIMED_OUT => // note that for timed out request we return NOT_AVAILABLE error code to let client retry
            Errors.COORDINATOR_NOT_AVAILABLE

          case Errors.NOT_LEADER_FOR_PARTITION
               | Errors.KAFKA_STORAGE_ERROR =>
            Errors.NOT_COORDINATOR

          case Errors.MESSAGE_TOO_LARGE
               | Errors.RECORD_LIST_TOO_LARGE =>
            Errors.UNKNOWN_SERVER_ERROR

          case other =>
            other
        }
      }

      if (responseError == Errors.NONE) {
        // now try to update the cache: we need to update the status in-place instead of
        // overwriting the whole object to ensure synchronization
        getTransactionState(transactionalId) match {

          case Left(err) =>
            info(s"Accessing the cached transaction metadata for $transactionalId returns $err error; " +
              s"aborting transition to the new metadata and setting the error in the callback")
            responseError = err
          case Right(Some(epochAndMetadata)) =>
            val metadata = epochAndMetadata.transactionMetadata

            metadata.inLock {
              if (epochAndMetadata.coordinatorEpoch != coordinatorEpoch) {
                // the cache may have been changed due to txn topic partition emigration and immigration,
                // in this case directly return NOT_COORDINATOR to client and let it to re-discover the transaction coordinator
                info(s"The cached coordinator epoch for $transactionalId has changed to ${epochAndMetadata.coordinatorEpoch} after appended its new metadata $newMetadata " +
                  s"to the transaction log (txn topic partition ${partitionFor(transactionalId)}) while it was $coordinatorEpoch before appending; " +
                  s"aborting transition to the new metadata and returning ${Errors.NOT_COORDINATOR} in the callback")
                responseError = Errors.NOT_COORDINATOR
              } else {
                metadata.completeTransitionTo(newMetadata)
                debug(s"Updating $transactionalId's transaction state to $newMetadata with coordinator epoch $coordinatorEpoch for $transactionalId succeeded")
              }
            }

          case Right(None) =>
            // this transactional id no longer exists, maybe the corresponding partition has already been migrated out.
            // return NOT_COORDINATOR to let the client re-discover the transaction coordinator
            info(s"The cached coordinator metadata does not exist in the cache anymore for $transactionalId after appended its new metadata $newMetadata " +
              s"to the transaction log (txn topic partition ${partitionFor(transactionalId)}) while it was $coordinatorEpoch before appending; " +
              s"aborting transition to the new metadata and returning ${Errors.NOT_COORDINATOR} in the callback")
            responseError = Errors.NOT_COORDINATOR
        }
      } else {
        // Reset the pending state when returning an error, since there is no active transaction for the transactional id at this point.
        getTransactionState(transactionalId) match {
          case Right(Some(epochAndTxnMetadata)) =>
            val metadata = epochAndTxnMetadata.transactionMetadata
            metadata.inLock {
              if (epochAndTxnMetadata.coordinatorEpoch == coordinatorEpoch) {
                if (retryOnError(responseError)) {
                  info(s"TransactionalId ${metadata.transactionalId} append transaction log for $newMetadata transition failed due to $responseError, " +
                    s"not resetting pending state ${metadata.pendingState} but just returning the error in the callback to let the caller retry")
                } else {
                  info(s"TransactionalId ${metadata.transactionalId} append transaction log for $newMetadata transition failed due to $responseError, " +
                    s"resetting pending state from ${metadata.pendingState}, aborting state transition and returning $responseError in the callback")

                  metadata.pendingState = None
                }
              } else {
                info(s"TransactionalId ${metadata.transactionalId} append transaction log for $newMetadata transition failed due to $responseError, " +
                  s"aborting state transition and returning the error in the callback since the coordinator epoch has changed from ${epochAndTxnMetadata.coordinatorEpoch} to $coordinatorEpoch")
              }
            }

          case Right(None) =>
            // Do nothing here, since we want to return the original append error to the user.
            info(s"TransactionalId $transactionalId append transaction log for $newMetadata transition failed due to $responseError, " +
              s"aborting state transition and returning the error in the callback since metadata is not available in the cache anymore")

          case Left(error) =>
            // Do nothing here, since we want to return the original append error to the user.
            info(s"TransactionalId $transactionalId append transaction log for $newMetadata transition failed due to $responseError, " +
              s"aborting state transition and returning the error in the callback since retrieving metadata returned $error")
        }

      }

      responseCallback(responseError)
    }

    inReadLock(stateLock) {
      // we need to hold the read lock on the transaction metadata cache until appending to local log returns;
      // this is to avoid the case where an emigration followed by an immigration could have completed after the check
      // returns and before appendRecords() is called, since otherwise entries with a high coordinator epoch could have
      // been appended to the log in between these two events, and therefore appendRecords() would append entries with
      // an old coordinator epoch that can still be successfully replicated on followers and make the log in a bad state.
      getTransactionState(transactionalId) match {
        case Left(err) =>
          responseCallback(err)

        case Right(None) =>
          // the coordinator metadata has been removed, reply to client immediately with NOT_COORDINATOR
          responseCallback(Errors.NOT_COORDINATOR)

        case Right(Some(epochAndMetadata)) =>
          val metadata = epochAndMetadata.transactionMetadata

          val append: Boolean = metadata.inLock {
            if (epochAndMetadata.coordinatorEpoch != coordinatorEpoch) {
              // the coordinator epoch has changed, reply to client immediately with NOT_COORDINATOR
              responseCallback(Errors.NOT_COORDINATOR)
              false
            } else {
              // do not need to check the metadata object itself since no concurrent thread should be able to modify it
              // under the same coordinator epoch, so directly append to txn log now
              true
            }
          }
          if (append) {
            replicaManager.appendRecords(
                newMetadata.txnTimeoutMs.toLong,
                TransactionLog.EnforcedRequiredAcks,
                internalTopicsAllowed = true,
                isFromClient = false,
                recordsPerPartition,
                updateCacheCallback,
                delayedProduceLock = Some(stateLock.readLock))

              trace(s"Appending new metadata $newMetadata for transaction id $transactionalId with coordinator epoch $coordinatorEpoch to the local transaction log")
          }
      }
    }
  }