in core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala [463:627]
def appendTransactionToLog(transactionalId: String,
coordinatorEpoch: Int,
newMetadata: TxnTransitMetadata,
responseCallback: Errors => Unit,
retryOnError: Errors => Boolean = _ => false): Unit = {
// generate the message for this transaction metadata
val keyBytes = TransactionLog.keyToBytes(transactionalId)
val valueBytes = TransactionLog.valueToBytes(newMetadata)
val timestamp = time.milliseconds()
val records = MemoryRecords.withRecords(TransactionLog.EnforcedCompressionType, new SimpleRecord(timestamp, keyBytes, valueBytes))
val topicPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionFor(transactionalId))
val recordsPerPartition = Map(topicPartition -> records)
// set the callback function to update transaction status in cache after log append completed
def updateCacheCallback(responseStatus: collection.Map[TopicPartition, PartitionResponse]): Unit = {
// the append response should only contain the topics partition
if (responseStatus.size != 1 || !responseStatus.contains(topicPartition))
throw new IllegalStateException("Append status %s should only have one partition %s"
.format(responseStatus, topicPartition))
val status = responseStatus(topicPartition)
var responseError = if (status.error == Errors.NONE) {
Errors.NONE
} else {
debug(s"Appending $transactionalId's new metadata $newMetadata failed due to ${status.error.exceptionName}")
// transform the log append error code to the corresponding coordinator error code
status.error match {
case Errors.UNKNOWN_TOPIC_OR_PARTITION
| Errors.NOT_ENOUGH_REPLICAS
| Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND
| Errors.REQUEST_TIMED_OUT => // note that for timed out request we return NOT_AVAILABLE error code to let client retry
Errors.COORDINATOR_NOT_AVAILABLE
case Errors.NOT_LEADER_FOR_PARTITION
| Errors.KAFKA_STORAGE_ERROR =>
Errors.NOT_COORDINATOR
case Errors.MESSAGE_TOO_LARGE
| Errors.RECORD_LIST_TOO_LARGE =>
Errors.UNKNOWN_SERVER_ERROR
case other =>
other
}
}
if (responseError == Errors.NONE) {
// now try to update the cache: we need to update the status in-place instead of
// overwriting the whole object to ensure synchronization
getTransactionState(transactionalId) match {
case Left(err) =>
info(s"Accessing the cached transaction metadata for $transactionalId returns $err error; " +
s"aborting transition to the new metadata and setting the error in the callback")
responseError = err
case Right(Some(epochAndMetadata)) =>
val metadata = epochAndMetadata.transactionMetadata
metadata.inLock {
if (epochAndMetadata.coordinatorEpoch != coordinatorEpoch) {
// the cache may have been changed due to txn topic partition emigration and immigration,
// in this case directly return NOT_COORDINATOR to client and let it to re-discover the transaction coordinator
info(s"The cached coordinator epoch for $transactionalId has changed to ${epochAndMetadata.coordinatorEpoch} after appended its new metadata $newMetadata " +
s"to the transaction log (txn topic partition ${partitionFor(transactionalId)}) while it was $coordinatorEpoch before appending; " +
s"aborting transition to the new metadata and returning ${Errors.NOT_COORDINATOR} in the callback")
responseError = Errors.NOT_COORDINATOR
} else {
metadata.completeTransitionTo(newMetadata)
debug(s"Updating $transactionalId's transaction state to $newMetadata with coordinator epoch $coordinatorEpoch for $transactionalId succeeded")
}
}
case Right(None) =>
// this transactional id no longer exists, maybe the corresponding partition has already been migrated out.
// return NOT_COORDINATOR to let the client re-discover the transaction coordinator
info(s"The cached coordinator metadata does not exist in the cache anymore for $transactionalId after appended its new metadata $newMetadata " +
s"to the transaction log (txn topic partition ${partitionFor(transactionalId)}) while it was $coordinatorEpoch before appending; " +
s"aborting transition to the new metadata and returning ${Errors.NOT_COORDINATOR} in the callback")
responseError = Errors.NOT_COORDINATOR
}
} else {
// Reset the pending state when returning an error, since there is no active transaction for the transactional id at this point.
getTransactionState(transactionalId) match {
case Right(Some(epochAndTxnMetadata)) =>
val metadata = epochAndTxnMetadata.transactionMetadata
metadata.inLock {
if (epochAndTxnMetadata.coordinatorEpoch == coordinatorEpoch) {
if (retryOnError(responseError)) {
info(s"TransactionalId ${metadata.transactionalId} append transaction log for $newMetadata transition failed due to $responseError, " +
s"not resetting pending state ${metadata.pendingState} but just returning the error in the callback to let the caller retry")
} else {
info(s"TransactionalId ${metadata.transactionalId} append transaction log for $newMetadata transition failed due to $responseError, " +
s"resetting pending state from ${metadata.pendingState}, aborting state transition and returning $responseError in the callback")
metadata.pendingState = None
}
} else {
info(s"TransactionalId ${metadata.transactionalId} append transaction log for $newMetadata transition failed due to $responseError, " +
s"aborting state transition and returning the error in the callback since the coordinator epoch has changed from ${epochAndTxnMetadata.coordinatorEpoch} to $coordinatorEpoch")
}
}
case Right(None) =>
// Do nothing here, since we want to return the original append error to the user.
info(s"TransactionalId $transactionalId append transaction log for $newMetadata transition failed due to $responseError, " +
s"aborting state transition and returning the error in the callback since metadata is not available in the cache anymore")
case Left(error) =>
// Do nothing here, since we want to return the original append error to the user.
info(s"TransactionalId $transactionalId append transaction log for $newMetadata transition failed due to $responseError, " +
s"aborting state transition and returning the error in the callback since retrieving metadata returned $error")
}
}
responseCallback(responseError)
}
inReadLock(stateLock) {
// we need to hold the read lock on the transaction metadata cache until appending to local log returns;
// this is to avoid the case where an emigration followed by an immigration could have completed after the check
// returns and before appendRecords() is called, since otherwise entries with a high coordinator epoch could have
// been appended to the log in between these two events, and therefore appendRecords() would append entries with
// an old coordinator epoch that can still be successfully replicated on followers and make the log in a bad state.
getTransactionState(transactionalId) match {
case Left(err) =>
responseCallback(err)
case Right(None) =>
// the coordinator metadata has been removed, reply to client immediately with NOT_COORDINATOR
responseCallback(Errors.NOT_COORDINATOR)
case Right(Some(epochAndMetadata)) =>
val metadata = epochAndMetadata.transactionMetadata
val append: Boolean = metadata.inLock {
if (epochAndMetadata.coordinatorEpoch != coordinatorEpoch) {
// the coordinator epoch has changed, reply to client immediately with NOT_COORDINATOR
responseCallback(Errors.NOT_COORDINATOR)
false
} else {
// do not need to check the metadata object itself since no concurrent thread should be able to modify it
// under the same coordinator epoch, so directly append to txn log now
true
}
}
if (append) {
replicaManager.appendRecords(
newMetadata.txnTimeoutMs.toLong,
TransactionLog.EnforcedRequiredAcks,
internalTopicsAllowed = true,
isFromClient = false,
recordsPerPartition,
updateCacheCallback,
delayedProduceLock = Some(stateLock.readLock))
trace(s"Appending new metadata $newMetadata for transaction id $transactionalId with coordinator epoch $coordinatorEpoch to the local transaction log")
}
}
}
}