in core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala [748:998]
private def endTransaction(transactionalId: String,
producerId: Long,
producerEpoch: Short,
txnMarkerResult: TransactionResult,
isFromClient: Boolean,
clientTransactionVersion: TransactionVersion,
responseCallback: EndTxnCallback,
requestLocal: RequestLocal): Unit = {
if (!clientTransactionVersion.supportsEpochBump()) {
endTransactionWithTV1(transactionalId, producerId, producerEpoch, txnMarkerResult, isFromClient, responseCallback, requestLocal)
return
}
var isEpochFence = false
if (transactionalId == null || transactionalId.isEmpty)
responseCallback(Errors.INVALID_REQUEST, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH)
else {
var producerIdCopy = RecordBatch.NO_PRODUCER_ID
var producerEpochCopy = RecordBatch.NO_PRODUCER_EPOCH
val preAppendResult: ApiResult[(Int, TxnTransitMetadata)] = txnManager.getTransactionState(transactionalId).flatMap {
case None =>
Left(Errors.INVALID_PRODUCER_ID_MAPPING)
case Some(epochAndTxnMetadata) =>
val txnMetadata = epochAndTxnMetadata.transactionMetadata
val coordinatorEpoch = epochAndTxnMetadata.coordinatorEpoch
txnMetadata.inLock {
producerIdCopy = txnMetadata.producerId
producerEpochCopy = txnMetadata.producerEpoch
// PrepareEpochFence has slightly different epoch bumping logic so don't include it here.
// Note that, it can only happen when the current state is Ongoing.
isEpochFence = txnMetadata.pendingState.contains(PrepareEpochFence)
// True if the client retried a request that had overflowed the epoch, and a new producer ID is stored in the txnMetadata
val retryOnOverflow = !isEpochFence && txnMetadata.prevProducerId == producerId &&
producerEpoch == Short.MaxValue - 1 && txnMetadata.producerEpoch == 0
// True if the client retried an endTxn request, and the bumped producer epoch is stored in the txnMetadata.
val retryOnEpochBump = !isEpochFence && txnMetadata.producerEpoch == producerEpoch + 1
val isValidEpoch = {
if (!isEpochFence) {
// With transactions V2, state + same epoch is not sufficient to determine if a retry transition is valid. If the epoch is the
// same it actually indicates the next endTransaction call. Instead, we want to check the epoch matches with the epoch in the retry conditions.
// Return producer fenced even in the cases where the epoch is higher and could indicate an invalid state transition.
// Use the following criteria to determine if a v2 retry is valid:
txnMetadata.state match {
case Ongoing | Empty | Dead | PrepareEpochFence =>
producerEpoch == txnMetadata.producerEpoch
case PrepareCommit | PrepareAbort =>
retryOnEpochBump
case CompleteCommit | CompleteAbort =>
retryOnEpochBump || retryOnOverflow || producerEpoch == txnMetadata.producerEpoch
}
} else {
// If the epoch is going to be fenced, it bumps the epoch differently with TV2.
(!isFromClient || producerEpoch == txnMetadata.producerEpoch) && producerEpoch >= txnMetadata.producerEpoch
}
}
val isRetry = retryOnEpochBump || retryOnOverflow
def generateTxnTransitMetadataForTxnCompletion(nextState: TransactionState, noPartitionAdded: Boolean): ApiResult[(Int, TxnTransitMetadata)] = {
// Maybe allocate new producer ID if we are bumping epoch and epoch is exhausted
val nextProducerIdOrErrors =
if (!isEpochFence && txnMetadata.isProducerEpochExhausted) {
try {
Right(producerIdManager.generateProducerId())
} catch {
case e: Exception => Left(Errors.forException(e))
}
} else {
Right(RecordBatch.NO_PRODUCER_ID)
}
if (nextState == PrepareAbort && isEpochFence) {
// We should clear the pending state to make way for the transition to PrepareAbort and also bump
// the epoch in the transaction metadata we are about to append.
txnMetadata.pendingState = None
txnMetadata.producerEpoch = producerEpoch
txnMetadata.lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH
}
nextProducerIdOrErrors.flatMap {
nextProducerId =>
Right(coordinatorEpoch, txnMetadata.prepareAbortOrCommit(nextState, clientTransactionVersion, nextProducerId.asInstanceOf[Long], time.milliseconds(), noPartitionAdded))
}
}
if (txnMetadata.pendingTransitionInProgress && txnMetadata.pendingState.get != PrepareEpochFence) {
// This check is performed first so that the pending transition can complete before the next checks.
// With TV2, we may be transitioning over a producer epoch overflow, and the producer may be using the
// new producer ID that is still only in pending state.
Left(Errors.CONCURRENT_TRANSACTIONS)
} else if (txnMetadata.producerId != producerId && !retryOnOverflow)
Left(Errors.INVALID_PRODUCER_ID_MAPPING)
else if (!isValidEpoch)
Left(Errors.PRODUCER_FENCED)
else txnMetadata.state match {
case Ongoing =>
val nextState = if (txnMarkerResult == TransactionResult.COMMIT)
PrepareCommit
else
PrepareAbort
generateTxnTransitMetadataForTxnCompletion(nextState, false)
case CompleteCommit =>
if (txnMarkerResult == TransactionResult.COMMIT) {
if (isRetry)
Left(Errors.NONE)
else
logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult)
} else {
// Abort.
if (isRetry)
logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult)
else
generateTxnTransitMetadataForTxnCompletion(PrepareAbort, true)
}
case CompleteAbort =>
if (txnMarkerResult == TransactionResult.ABORT) {
if (isRetry)
Left(Errors.NONE)
else
generateTxnTransitMetadataForTxnCompletion(PrepareAbort, true)
} else {
// Commit.
logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult)
}
case PrepareCommit =>
if (txnMarkerResult == TransactionResult.COMMIT)
Left(Errors.CONCURRENT_TRANSACTIONS)
else
logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult)
case PrepareAbort =>
if (txnMarkerResult == TransactionResult.ABORT)
Left(Errors.CONCURRENT_TRANSACTIONS)
else
logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult)
case Empty =>
if (txnMarkerResult == TransactionResult.ABORT) {
generateTxnTransitMetadataForTxnCompletion(PrepareAbort, true)
} else {
logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult)
}
case Dead | PrepareEpochFence =>
val errorMsg = s"Found transactionalId $transactionalId with state ${txnMetadata.state}. " +
s"This is illegal as we should never have transitioned to this state."
fatal(errorMsg)
throw new IllegalStateException(errorMsg)
}
}
}
preAppendResult match {
case Left(err) =>
if (err == Errors.NONE) {
responseCallback(err, producerIdCopy, producerEpochCopy)
} else {
debug(s"Aborting append of $txnMarkerResult to transaction log with coordinator and returning $err error to client for $transactionalId's EndTransaction request")
responseCallback(err, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH)
}
case Right((coordinatorEpoch, newMetadata)) =>
def sendTxnMarkersCallback(error: Errors): Unit = {
if (error == Errors.NONE) {
val preSendResult: ApiResult[(TransactionMetadata, TxnTransitMetadata)] = txnManager.getTransactionState(transactionalId).flatMap {
case None =>
val errorMsg = s"The coordinator still owns the transaction partition for $transactionalId, but there is " +
s"no metadata in the cache; this is not expected"
fatal(errorMsg)
throw new IllegalStateException(errorMsg)
case Some(epochAndMetadata) =>
if (epochAndMetadata.coordinatorEpoch == coordinatorEpoch) {
val txnMetadata = epochAndMetadata.transactionMetadata
txnMetadata.inLock {
if (txnMetadata.producerId != producerId)
Left(Errors.INVALID_PRODUCER_ID_MAPPING)
else if (txnMetadata.producerEpoch != producerEpoch && txnMetadata.producerEpoch != producerEpoch + 1)
Left(Errors.PRODUCER_FENCED)
else if (txnMetadata.pendingTransitionInProgress)
Left(Errors.CONCURRENT_TRANSACTIONS)
else txnMetadata.state match {
case Empty| Ongoing | CompleteCommit | CompleteAbort =>
logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult)
case PrepareCommit =>
if (txnMarkerResult != TransactionResult.COMMIT)
logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult)
else
Right(txnMetadata, txnMetadata.prepareComplete(time.milliseconds()))
case PrepareAbort =>
if (txnMarkerResult != TransactionResult.ABORT)
logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult)
else
Right(txnMetadata, txnMetadata.prepareComplete(time.milliseconds()))
case Dead | PrepareEpochFence =>
val errorMsg = s"Found transactionalId $transactionalId with state ${txnMetadata.state}. " +
s"This is illegal as we should never have transitioned to this state."
fatal(errorMsg)
throw new IllegalStateException(errorMsg)
}
}
} else {
debug(s"The transaction coordinator epoch has changed to ${epochAndMetadata.coordinatorEpoch} after $txnMarkerResult was " +
s"successfully appended to the log for $transactionalId with old epoch $coordinatorEpoch")
Left(Errors.NOT_COORDINATOR)
}
}
preSendResult match {
case Left(err) =>
info(s"Aborting sending of transaction markers after appended $txnMarkerResult to transaction log and returning $err error to client for $transactionalId's EndTransaction request")
responseCallback(err, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH)
case Right((txnMetadata, newPreSendMetadata)) =>
// we can respond to the client immediately and continue to write the txn markers if
// the log append was successful
responseCallback(Errors.NONE, newPreSendMetadata.producerId, newPreSendMetadata.producerEpoch)
txnMarkerChannelManager.addTxnMarkersToSend(coordinatorEpoch, txnMarkerResult, txnMetadata, newPreSendMetadata)
}
} else {
info(s"Aborting sending of transaction markers and returning $error error to client for $transactionalId's EndTransaction request of $txnMarkerResult, " +
s"since appending $newMetadata to transaction log with coordinator epoch $coordinatorEpoch failed")
if (isEpochFence) {
txnManager.getTransactionState(transactionalId).foreach {
case None =>
warn(s"The coordinator still owns the transaction partition for $transactionalId, but there is " +
s"no metadata in the cache; this is not expected")
case Some(epochAndMetadata) =>
if (epochAndMetadata.coordinatorEpoch == coordinatorEpoch) {
// This was attempted epoch fence that failed, so mark this state on the metadata
epochAndMetadata.transactionMetadata.hasFailedEpochFence = true
warn(s"The coordinator failed to write an epoch fence transition for producer $transactionalId to the transaction log " +
s"with error $error. The epoch was increased to ${newMetadata.producerEpoch} but not returned to the client")
}
}
}
responseCallback(error, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH)
}
}
txnManager.appendTransactionToLog(transactionalId, coordinatorEpoch, newMetadata,
sendTxnMarkersCallback, requestLocal = requestLocal)
}
}
}