def completeTransitionTo()

in core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala [471:562]


  def completeTransitionTo(transitMetadata: TxnTransitMetadata): Unit = {
    // metadata transition is valid only if all the following conditions are met:
    //
    // 1. the new state is already indicated in the pending state.
    // 2. the epoch should be either the same value, the old value + 1, or 0 if we have a new producerId.
    // 3. the last update time is no smaller than the old value.
    // 4. the old partitions set is a subset of the new partitions set.
    //
    // plus, we should only try to update the metadata after the corresponding log entry has been successfully
    // written and replicated (see TransactionStateManager#appendTransactionToLog)
    //
    // if valid, transition is done via overwriting the whole object to ensure synchronization

    val toState = pendingState.getOrElse {
      fatal(s"$this's transition to $transitMetadata failed since pendingState is not defined: this should not happen")

      throw new IllegalStateException(s"TransactionalId $transactionalId " +
        "completing transaction state transition while it does not have a pending state")
    }

    if (toState != transitMetadata.txnState) {
      throwStateTransitionFailure(transitMetadata)
    } else {
      toState match {
        case Empty => // from initPid
          if ((producerEpoch != transitMetadata.producerEpoch && !validProducerEpochBump(transitMetadata)) ||
            transitMetadata.topicPartitions.nonEmpty ||
            transitMetadata.txnStartTimestamp != -1) {

            throwStateTransitionFailure(transitMetadata)
          }

        case Ongoing => // from addPartitions
          if (!validProducerEpoch(transitMetadata) ||
            !topicPartitions.subsetOf(transitMetadata.topicPartitions) ||
            txnTimeoutMs != transitMetadata.txnTimeoutMs) {

            throwStateTransitionFailure(transitMetadata)
          }

        case PrepareAbort | PrepareCommit => // from endTxn
          // In V2, we allow state transits from Empty, CompleteCommit and CompleteAbort to PrepareAbort. It is possible
          // their updated start time is not equal to the current start time.
          val allowedEmptyAbort = toState == PrepareAbort && transitMetadata.clientTransactionVersion.supportsEpochBump() &&
            (state == Empty || state == CompleteCommit || state == CompleteAbort)
          val validTimestamp = txnStartTimestamp == transitMetadata.txnStartTimestamp || allowedEmptyAbort
          if (!validProducerEpoch(transitMetadata) ||
            !topicPartitions.equals(transitMetadata.topicPartitions) ||
            txnTimeoutMs != transitMetadata.txnTimeoutMs || !validTimestamp) {

            throwStateTransitionFailure(transitMetadata)
          }

        case CompleteAbort | CompleteCommit => // from write markers
          if (!validProducerEpoch(transitMetadata) ||
            txnTimeoutMs != transitMetadata.txnTimeoutMs ||
            transitMetadata.txnStartTimestamp == -1) {
            throwStateTransitionFailure(transitMetadata)
          }

        case PrepareEpochFence =>
          // We should never get here, since once we prepare to fence the epoch, we immediately set the pending state
          // to PrepareAbort, and then consequently to CompleteAbort after the markers are written.. So we should never
          // ever try to complete a transition to PrepareEpochFence, as it is not a valid previous state for any other state, and hence
          // can never be transitioned out of.
          throwStateTransitionFailure(transitMetadata)


        case Dead =>
          // The transactionalId was being expired. The completion of the operation should result in removal of the
          // the metadata from the cache, so we should never realistically transition to the dead state.
          throw new IllegalStateException(s"TransactionalId $transactionalId is trying to complete a transition to " +
            s"$toState. This means that the transactionalId was being expired, and the only acceptable completion of " +
            s"this operation is to remove the transaction metadata from the cache, not to persist the $toState in the log.")
      }

      debug(s"TransactionalId $transactionalId complete transition from $state to $transitMetadata")
      producerId = transitMetadata.producerId
      prevProducerId = transitMetadata.prevProducerId
      nextProducerId = transitMetadata.nextProducerId
      producerEpoch = transitMetadata.producerEpoch
      lastProducerEpoch = transitMetadata.lastProducerEpoch
      txnTimeoutMs = transitMetadata.txnTimeoutMs
      topicPartitions = transitMetadata.topicPartitions
      txnStartTimestamp = transitMetadata.txnStartTimestamp
      txnLastUpdateTimestamp = transitMetadata.txnLastUpdateTimestamp
      clientTransactionVersion = transitMetadata.clientTransactionVersion

      pendingState = None
      state = toState
    }
  }