in core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala [471:562]
def completeTransitionTo(transitMetadata: TxnTransitMetadata): Unit = {
// metadata transition is valid only if all the following conditions are met:
//
// 1. the new state is already indicated in the pending state.
// 2. the epoch should be either the same value, the old value + 1, or 0 if we have a new producerId.
// 3. the last update time is no smaller than the old value.
// 4. the old partitions set is a subset of the new partitions set.
//
// plus, we should only try to update the metadata after the corresponding log entry has been successfully
// written and replicated (see TransactionStateManager#appendTransactionToLog)
//
// if valid, transition is done via overwriting the whole object to ensure synchronization
val toState = pendingState.getOrElse {
fatal(s"$this's transition to $transitMetadata failed since pendingState is not defined: this should not happen")
throw new IllegalStateException(s"TransactionalId $transactionalId " +
"completing transaction state transition while it does not have a pending state")
}
if (toState != transitMetadata.txnState) {
throwStateTransitionFailure(transitMetadata)
} else {
toState match {
case Empty => // from initPid
if ((producerEpoch != transitMetadata.producerEpoch && !validProducerEpochBump(transitMetadata)) ||
transitMetadata.topicPartitions.nonEmpty ||
transitMetadata.txnStartTimestamp != -1) {
throwStateTransitionFailure(transitMetadata)
}
case Ongoing => // from addPartitions
if (!validProducerEpoch(transitMetadata) ||
!topicPartitions.subsetOf(transitMetadata.topicPartitions) ||
txnTimeoutMs != transitMetadata.txnTimeoutMs) {
throwStateTransitionFailure(transitMetadata)
}
case PrepareAbort | PrepareCommit => // from endTxn
// In V2, we allow state transits from Empty, CompleteCommit and CompleteAbort to PrepareAbort. It is possible
// their updated start time is not equal to the current start time.
val allowedEmptyAbort = toState == PrepareAbort && transitMetadata.clientTransactionVersion.supportsEpochBump() &&
(state == Empty || state == CompleteCommit || state == CompleteAbort)
val validTimestamp = txnStartTimestamp == transitMetadata.txnStartTimestamp || allowedEmptyAbort
if (!validProducerEpoch(transitMetadata) ||
!topicPartitions.equals(transitMetadata.topicPartitions) ||
txnTimeoutMs != transitMetadata.txnTimeoutMs || !validTimestamp) {
throwStateTransitionFailure(transitMetadata)
}
case CompleteAbort | CompleteCommit => // from write markers
if (!validProducerEpoch(transitMetadata) ||
txnTimeoutMs != transitMetadata.txnTimeoutMs ||
transitMetadata.txnStartTimestamp == -1) {
throwStateTransitionFailure(transitMetadata)
}
case PrepareEpochFence =>
// We should never get here, since once we prepare to fence the epoch, we immediately set the pending state
// to PrepareAbort, and then consequently to CompleteAbort after the markers are written.. So we should never
// ever try to complete a transition to PrepareEpochFence, as it is not a valid previous state for any other state, and hence
// can never be transitioned out of.
throwStateTransitionFailure(transitMetadata)
case Dead =>
// The transactionalId was being expired. The completion of the operation should result in removal of the
// the metadata from the cache, so we should never realistically transition to the dead state.
throw new IllegalStateException(s"TransactionalId $transactionalId is trying to complete a transition to " +
s"$toState. This means that the transactionalId was being expired, and the only acceptable completion of " +
s"this operation is to remove the transaction metadata from the cache, not to persist the $toState in the log.")
}
debug(s"TransactionalId $transactionalId complete transition from $state to $transitMetadata")
producerId = transitMetadata.producerId
prevProducerId = transitMetadata.prevProducerId
nextProducerId = transitMetadata.nextProducerId
producerEpoch = transitMetadata.producerEpoch
lastProducerEpoch = transitMetadata.lastProducerEpoch
txnTimeoutMs = transitMetadata.txnTimeoutMs
topicPartitions = transitMetadata.topicPartitions
txnStartTimestamp = transitMetadata.txnStartTimestamp
txnLastUpdateTimestamp = transitMetadata.txnLastUpdateTimestamp
clientTransactionVersion = transitMetadata.clientTransactionVersion
pendingState = None
state = toState
}
}