private def endTransaction()

in core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala [748:998]


  private def endTransaction(transactionalId: String,
                             producerId: Long,
                             producerEpoch: Short,
                             txnMarkerResult: TransactionResult,
                             isFromClient: Boolean,
                             clientTransactionVersion: TransactionVersion,
                             responseCallback: EndTxnCallback,
                             requestLocal: RequestLocal): Unit = {
    if (!clientTransactionVersion.supportsEpochBump()) {
      endTransactionWithTV1(transactionalId, producerId, producerEpoch, txnMarkerResult, isFromClient, responseCallback, requestLocal)
      return
    }
    var isEpochFence = false
    if (transactionalId == null || transactionalId.isEmpty)
      responseCallback(Errors.INVALID_REQUEST, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH)
    else {
      var producerIdCopy = RecordBatch.NO_PRODUCER_ID
      var producerEpochCopy = RecordBatch.NO_PRODUCER_EPOCH
      val preAppendResult: ApiResult[(Int, TxnTransitMetadata)] = txnManager.getTransactionState(transactionalId).flatMap {
        case None =>
          Left(Errors.INVALID_PRODUCER_ID_MAPPING)

        case Some(epochAndTxnMetadata) =>
          val txnMetadata = epochAndTxnMetadata.transactionMetadata
          val coordinatorEpoch = epochAndTxnMetadata.coordinatorEpoch

          txnMetadata.inLock {
            producerIdCopy = txnMetadata.producerId
            producerEpochCopy = txnMetadata.producerEpoch
            // PrepareEpochFence has slightly different epoch bumping logic so don't include it here.
            // Note that, it can only happen when the current state is Ongoing.
            isEpochFence = txnMetadata.pendingState.contains(PrepareEpochFence)
            // True if the client retried a request that had overflowed the epoch, and a new producer ID is stored in the txnMetadata
            val retryOnOverflow = !isEpochFence && txnMetadata.prevProducerId == producerId &&
              producerEpoch == Short.MaxValue - 1 && txnMetadata.producerEpoch == 0
            // True if the client retried an endTxn request, and the bumped producer epoch is stored in the txnMetadata.
            val retryOnEpochBump = !isEpochFence && txnMetadata.producerEpoch == producerEpoch + 1

            val isValidEpoch = {
              if (!isEpochFence) {
                // With transactions V2, state + same epoch is not sufficient to determine if a retry transition is valid. If the epoch is the
                // same it actually indicates the next endTransaction call. Instead, we want to check the epoch matches with the epoch in the retry conditions.
                // Return producer fenced even in the cases where the epoch is higher and could indicate an invalid state transition.
                // Use the following criteria to determine if a v2 retry is valid:
                txnMetadata.state match {
                  case Ongoing | Empty | Dead | PrepareEpochFence =>
                    producerEpoch == txnMetadata.producerEpoch
                  case PrepareCommit | PrepareAbort =>
                    retryOnEpochBump
                  case CompleteCommit | CompleteAbort =>
                    retryOnEpochBump || retryOnOverflow || producerEpoch == txnMetadata.producerEpoch
                }
              } else {
                // If the epoch is going to be fenced, it bumps the epoch differently with TV2.
                (!isFromClient || producerEpoch == txnMetadata.producerEpoch) && producerEpoch >= txnMetadata.producerEpoch
              }
            }

            val isRetry = retryOnEpochBump || retryOnOverflow

            def generateTxnTransitMetadataForTxnCompletion(nextState: TransactionState, noPartitionAdded: Boolean): ApiResult[(Int, TxnTransitMetadata)] = {
              // Maybe allocate new producer ID if we are bumping epoch and epoch is exhausted
              val nextProducerIdOrErrors =
                if (!isEpochFence && txnMetadata.isProducerEpochExhausted) {
                  try {
                    Right(producerIdManager.generateProducerId())
                  } catch {
                    case e: Exception => Left(Errors.forException(e))
                  }
                } else {
                  Right(RecordBatch.NO_PRODUCER_ID)
                }

              if (nextState == PrepareAbort && isEpochFence) {
                // We should clear the pending state to make way for the transition to PrepareAbort and also bump
                // the epoch in the transaction metadata we are about to append.
                txnMetadata.pendingState = None
                txnMetadata.producerEpoch = producerEpoch
                txnMetadata.lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH
              }

              nextProducerIdOrErrors.flatMap {
                nextProducerId =>
                  Right(coordinatorEpoch, txnMetadata.prepareAbortOrCommit(nextState, clientTransactionVersion, nextProducerId.asInstanceOf[Long], time.milliseconds(), noPartitionAdded))
              }
            }

            if (txnMetadata.pendingTransitionInProgress && txnMetadata.pendingState.get != PrepareEpochFence) {
              // This check is performed first so that the pending transition can complete before the next checks.
              // With TV2, we may be transitioning over a producer epoch overflow, and the producer may be using the
              // new producer ID that is still only in pending state.
              Left(Errors.CONCURRENT_TRANSACTIONS)
            } else if (txnMetadata.producerId != producerId && !retryOnOverflow)
              Left(Errors.INVALID_PRODUCER_ID_MAPPING)
            else if (!isValidEpoch)
              Left(Errors.PRODUCER_FENCED)
            else txnMetadata.state match {
              case Ongoing =>
                val nextState = if (txnMarkerResult == TransactionResult.COMMIT)
                  PrepareCommit
                else
                  PrepareAbort

                generateTxnTransitMetadataForTxnCompletion(nextState, false)
              case CompleteCommit =>
                if (txnMarkerResult == TransactionResult.COMMIT) {
                  if (isRetry)
                    Left(Errors.NONE)
                  else
                    logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult)
                } else {
                  // Abort.
                  if (isRetry)
                    logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult)
                  else
                    generateTxnTransitMetadataForTxnCompletion(PrepareAbort, true)
                }
              case CompleteAbort =>
                if (txnMarkerResult == TransactionResult.ABORT) {
                  if (isRetry)
                    Left(Errors.NONE)
                  else
                    generateTxnTransitMetadataForTxnCompletion(PrepareAbort, true)
                } else {
                  // Commit.
                  logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult)
                }
              case PrepareCommit =>
                if (txnMarkerResult == TransactionResult.COMMIT)
                  Left(Errors.CONCURRENT_TRANSACTIONS)
                else
                  logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult)
              case PrepareAbort =>
                if (txnMarkerResult == TransactionResult.ABORT)
                  Left(Errors.CONCURRENT_TRANSACTIONS)
                else
                  logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult)
              case Empty =>
                if (txnMarkerResult == TransactionResult.ABORT) {
                  generateTxnTransitMetadataForTxnCompletion(PrepareAbort, true)
                } else {
                  logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult)
                }
              case Dead | PrepareEpochFence =>
                val errorMsg = s"Found transactionalId $transactionalId with state ${txnMetadata.state}. " +
                  s"This is illegal as we should never have transitioned to this state."
                fatal(errorMsg)
                throw new IllegalStateException(errorMsg)

            }
          }
      }

      preAppendResult match {
        case Left(err) =>
          if (err == Errors.NONE) {
            responseCallback(err, producerIdCopy, producerEpochCopy)
          } else {
            debug(s"Aborting append of $txnMarkerResult to transaction log with coordinator and returning $err error to client for $transactionalId's EndTransaction request")
            responseCallback(err, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH)
          }

        case Right((coordinatorEpoch, newMetadata)) =>
          def sendTxnMarkersCallback(error: Errors): Unit = {
            if (error == Errors.NONE) {
              val preSendResult: ApiResult[(TransactionMetadata, TxnTransitMetadata)] = txnManager.getTransactionState(transactionalId).flatMap {
                case None =>
                  val errorMsg = s"The coordinator still owns the transaction partition for $transactionalId, but there is " +
                    s"no metadata in the cache; this is not expected"
                  fatal(errorMsg)
                  throw new IllegalStateException(errorMsg)

                case Some(epochAndMetadata) =>
                  if (epochAndMetadata.coordinatorEpoch == coordinatorEpoch) {
                    val txnMetadata = epochAndMetadata.transactionMetadata
                    txnMetadata.inLock {
                      if (txnMetadata.producerId != producerId)
                        Left(Errors.INVALID_PRODUCER_ID_MAPPING)
                      else if (txnMetadata.producerEpoch != producerEpoch && txnMetadata.producerEpoch != producerEpoch + 1)
                        Left(Errors.PRODUCER_FENCED)
                      else if (txnMetadata.pendingTransitionInProgress)
                        Left(Errors.CONCURRENT_TRANSACTIONS)
                      else txnMetadata.state match {
                        case Empty| Ongoing | CompleteCommit | CompleteAbort =>
                          logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult)
                        case PrepareCommit =>
                          if (txnMarkerResult != TransactionResult.COMMIT)
                            logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult)
                          else
                            Right(txnMetadata, txnMetadata.prepareComplete(time.milliseconds()))
                        case PrepareAbort =>
                          if (txnMarkerResult != TransactionResult.ABORT)
                            logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult)
                          else
                            Right(txnMetadata, txnMetadata.prepareComplete(time.milliseconds()))
                        case Dead | PrepareEpochFence =>
                          val errorMsg = s"Found transactionalId $transactionalId with state ${txnMetadata.state}. " +
                            s"This is illegal as we should never have transitioned to this state."
                          fatal(errorMsg)
                          throw new IllegalStateException(errorMsg)

                      }
                    }
                  } else {
                    debug(s"The transaction coordinator epoch has changed to ${epochAndMetadata.coordinatorEpoch} after $txnMarkerResult was " +
                      s"successfully appended to the log for $transactionalId with old epoch $coordinatorEpoch")
                    Left(Errors.NOT_COORDINATOR)
                  }
              }

              preSendResult match {
                case Left(err) =>
                  info(s"Aborting sending of transaction markers after appended $txnMarkerResult to transaction log and returning $err error to client for $transactionalId's EndTransaction request")
                  responseCallback(err, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH)

                case Right((txnMetadata, newPreSendMetadata)) =>
                  // we can respond to the client immediately and continue to write the txn markers if
                  // the log append was successful
                  responseCallback(Errors.NONE, newPreSendMetadata.producerId, newPreSendMetadata.producerEpoch)

                  txnMarkerChannelManager.addTxnMarkersToSend(coordinatorEpoch, txnMarkerResult, txnMetadata, newPreSendMetadata)
              }
            } else {
              info(s"Aborting sending of transaction markers and returning $error error to client for $transactionalId's EndTransaction request of $txnMarkerResult, " +
                s"since appending $newMetadata to transaction log with coordinator epoch $coordinatorEpoch failed")

              if (isEpochFence) {
                txnManager.getTransactionState(transactionalId).foreach {
                  case None =>
                    warn(s"The coordinator still owns the transaction partition for $transactionalId, but there is " +
                      s"no metadata in the cache; this is not expected")

                  case Some(epochAndMetadata) =>
                    if (epochAndMetadata.coordinatorEpoch == coordinatorEpoch) {
                      // This was attempted epoch fence that failed, so mark this state on the metadata
                      epochAndMetadata.transactionMetadata.hasFailedEpochFence = true
                      warn(s"The coordinator failed to write an epoch fence transition for producer $transactionalId to the transaction log " +
                        s"with error $error. The epoch was increased to ${newMetadata.producerEpoch} but not returned to the client")
                    }
                }
              }

              responseCallback(error, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH)
            }
          }

          txnManager.appendTransactionToLog(transactionalId, coordinatorEpoch, newMetadata,
            sendTxnMarkersCallback, requestLocal = requestLocal)
      }
    }
  }