in core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala [351:441]
def completeTransitionTo(transitMetadata: TxnTransitMetadata): Unit = {
// metadata transition is valid only if all the following conditions are met:
//
// 1. the new state is already indicated in the pending state.
// 2. the epoch should be either the same value, the old value + 1, or 0 if we have a new producerId.
// 3. the last update time is no smaller than the old value.
// 4. the old partitions set is a subset of the new partitions set.
//
// plus, we should only try to update the metadata after the corresponding log entry has been successfully
// written and replicated (see TransactionStateManager#appendTransactionToLog)
//
// if valid, transition is done via overwriting the whole object to ensure synchronization
val toState = pendingState.getOrElse {
fatal(s"$this's transition to $transitMetadata failed since pendingState is not defined: this should not happen")
throw new IllegalStateException(s"TransactionalId $transactionalId " +
"completing transaction state transition while it does not have a pending state")
}
if (toState != transitMetadata.txnState) {
throwStateTransitionFailure(transitMetadata)
} else {
toState match {
case Empty => // from initPid
if ((producerEpoch != transitMetadata.producerEpoch && !validProducerEpochBump(transitMetadata)) ||
transitMetadata.topicPartitions.nonEmpty ||
transitMetadata.txnStartTimestamp != -1) {
throwStateTransitionFailure(transitMetadata)
} else {
txnTimeoutMs = transitMetadata.txnTimeoutMs
producerEpoch = transitMetadata.producerEpoch
lastProducerEpoch = transitMetadata.lastProducerEpoch
producerId = transitMetadata.producerId
lastProducerId = transitMetadata.lastProducerId
}
case Ongoing => // from addPartitions
if (!validProducerEpoch(transitMetadata) ||
!topicPartitions.subsetOf(transitMetadata.topicPartitions) ||
txnTimeoutMs != transitMetadata.txnTimeoutMs) {
throwStateTransitionFailure(transitMetadata)
} else {
txnStartTimestamp = transitMetadata.txnStartTimestamp
addPartitions(transitMetadata.topicPartitions)
}
case PrepareAbort | PrepareCommit => // from endTxn
if (!validProducerEpoch(transitMetadata) ||
!topicPartitions.toSet.equals(transitMetadata.topicPartitions) ||
txnTimeoutMs != transitMetadata.txnTimeoutMs ||
txnStartTimestamp != transitMetadata.txnStartTimestamp) {
throwStateTransitionFailure(transitMetadata)
}
case CompleteAbort | CompleteCommit => // from write markers
if (!validProducerEpoch(transitMetadata) ||
txnTimeoutMs != transitMetadata.txnTimeoutMs ||
transitMetadata.txnStartTimestamp == -1) {
throwStateTransitionFailure(transitMetadata)
} else {
txnStartTimestamp = transitMetadata.txnStartTimestamp
topicPartitions.clear()
}
case PrepareEpochFence =>
// We should never get here, since once we prepare to fence the epoch, we immediately set the pending state
// to PrepareAbort, and then consequently to CompleteAbort after the markers are written.. So we should never
// ever try to complete a transition to PrepareEpochFence, as it is not a valid previous state for any other state, and hence
// can never be transitioned out of.
throwStateTransitionFailure(transitMetadata)
case Dead =>
// The transactionalId was being expired. The completion of the operation should result in removal of the
// the metadata from the cache, so we should never realistically transition to the dead state.
throw new IllegalStateException(s"TransactionalId $transactionalId is trying to complete a transition to " +
s"$toState. This means that the transactionalId was being expired, and the only acceptable completion of " +
s"this operation is to remove the transaction metadata from the cache, not to persist the $toState in the log.")
}
debug(s"TransactionalId $transactionalId complete transition from $state to $transitMetadata")
txnLastUpdateTimestamp = transitMetadata.txnLastUpdateTimestamp
pendingState = None
state = toState
}
}