in core/src/main/scala/kafka/server/KafkaApis.scala [1603:1774]
def handleWriteTxnMarkersRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
// We are checking for AlterCluster permissions first. If it is not present, we are authorizing cluster operation
// The latter will throw an exception if it is denied.
if (!authHelper.authorize(request.context, ALTER, CLUSTER, CLUSTER_NAME, logIfDenied = false)) {
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
}
val writeTxnMarkersRequest = request.body[WriteTxnMarkersRequest]
val errors = new ConcurrentHashMap[java.lang.Long, util.Map[TopicPartition, Errors]]()
val markers = writeTxnMarkersRequest.markers
val numAppends = new AtomicInteger(markers.size)
if (numAppends.get == 0) {
requestHelper.sendResponseExemptThrottle(request, new WriteTxnMarkersResponse(errors))
return
}
def updateErrors(producerId: Long, currentErrors: ConcurrentHashMap[TopicPartition, Errors]): Unit = {
val previousErrors = errors.putIfAbsent(producerId, currentErrors)
if (previousErrors != null)
previousErrors.putAll(currentErrors)
}
/**
* This is the call back invoked when a log append of transaction markers succeeds. This can be called multiple
* times when handling a single WriteTxnMarkersRequest because there is one append per TransactionMarker in the
* request, so there could be multiple appends of markers to the log. The final response will be sent only
* after all appends have returned.
*/
def maybeSendResponseCallback(producerId: Long, result: TransactionResult, currentErrors: ConcurrentHashMap[TopicPartition, Errors]): Unit = {
trace(s"End transaction marker append for producer id $producerId completed with status: $currentErrors")
updateErrors(producerId, currentErrors)
def maybeSendResponse(): Unit = {
if (numAppends.decrementAndGet() == 0) {
requestHelper.sendResponseExemptThrottle(request, new WriteTxnMarkersResponse(errors))
}
}
// The new group coordinator uses GroupCoordinator#completeTransaction so we do
// not need to call GroupCoordinator#onTransactionCompleted here.
if (config.isNewGroupCoordinatorEnabled) {
maybeSendResponse()
return
}
val successfulOffsetsPartitions = currentErrors.asScala.filter { case (topicPartition, error) =>
topicPartition.topic == GROUP_METADATA_TOPIC_NAME && error == Errors.NONE
}.keys
// If no end transaction marker has been written to a __consumer_offsets partition, we do not
// need to call GroupCoordinator#onTransactionCompleted.
if (successfulOffsetsPartitions.isEmpty) {
maybeSendResponse()
return
}
// Otherwise, we call GroupCoordinator#onTransactionCompleted to materialize the offsets
// into the cache and we wait until the meterialization is completed.
groupCoordinator.onTransactionCompleted(producerId, successfulOffsetsPartitions.asJava, result).whenComplete { (_, exception) =>
if (exception != null) {
error(s"Received an exception while trying to update the offsets cache on transaction marker append", exception)
val updatedErrors = new ConcurrentHashMap[TopicPartition, Errors]()
successfulOffsetsPartitions.foreach(updatedErrors.put(_, Errors.UNKNOWN_SERVER_ERROR))
updateErrors(producerId, updatedErrors)
}
maybeSendResponse()
}
}
// TODO: The current append API makes doing separate writes per producerId a little easier, but it would
// be nice to have only one append to the log. This requires pushing the building of the control records
// into Log so that we only append those having a valid producer epoch, and exposing a new appendControlRecord
// API in ReplicaManager. For now, we've done the simpler approach
var skippedMarkers = 0
for (marker <- markers.asScala) {
val producerId = marker.producerId
val partitionsWithCompatibleMessageFormat = new mutable.ArrayBuffer[TopicPartition]
val currentErrors = new ConcurrentHashMap[TopicPartition, Errors]()
marker.partitions.forEach { partition =>
replicaManager.onlinePartition(partition) match {
case Some(_) =>
partitionsWithCompatibleMessageFormat += partition
case None =>
currentErrors.put(partition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
}
}
if (!currentErrors.isEmpty)
updateErrors(producerId, currentErrors)
if (partitionsWithCompatibleMessageFormat.isEmpty) {
numAppends.decrementAndGet()
skippedMarkers += 1
} else {
val controlRecordType = marker.transactionResult match {
case TransactionResult.COMMIT => ControlRecordType.COMMIT
case TransactionResult.ABORT => ControlRecordType.ABORT
}
val markerResults = new ConcurrentHashMap[TopicPartition, Errors]()
val numPartitions = new AtomicInteger(partitionsWithCompatibleMessageFormat.size)
def addResultAndMaybeComplete(partition: TopicPartition, error: Errors): Unit = {
markerResults.put(partition, error)
// We should only call maybeSendResponseCallback once per marker. Otherwise, it causes sending the response
// prematurely.
if (numPartitions.decrementAndGet() == 0) {
maybeSendResponseCallback(producerId, marker.transactionResult, markerResults)
}
}
val controlRecords = mutable.Map.empty[TopicPartition, MemoryRecords]
partitionsWithCompatibleMessageFormat.foreach { partition =>
if (groupCoordinator.isNewGroupCoordinator && partition.topic == GROUP_METADATA_TOPIC_NAME) {
// When the new group coordinator is used, writing the end marker is fully delegated
// to the group coordinator.
groupCoordinator.completeTransaction(
partition,
marker.producerId,
marker.producerEpoch,
marker.coordinatorEpoch,
marker.transactionResult,
Duration.ofMillis(config.requestTimeoutMs.toLong)
).whenComplete { (_, exception) =>
val error = if (exception == null) {
Errors.NONE
} else {
Errors.forException(exception) match {
case Errors.COORDINATOR_NOT_AVAILABLE | Errors.COORDINATOR_LOAD_IN_PROGRESS | Errors.NOT_COORDINATOR =>
// The transaction coordinator does not expect those errors so we translate them
// to NOT_LEADER_OR_FOLLOWER to signal to it that the coordinator is not ready yet.
Errors.NOT_LEADER_OR_FOLLOWER
case error =>
error
}
}
addResultAndMaybeComplete(partition, error)
}
} else {
// Otherwise, the regular appendRecords path is used for all the non __consumer_offsets
// partitions or for all partitions when the new group coordinator is disabled.
controlRecords += partition -> MemoryRecords.withEndTransactionMarker(
producerId,
marker.producerEpoch,
new EndTransactionMarker(controlRecordType, marker.coordinatorEpoch)
)
}
}
if (controlRecords.nonEmpty) {
replicaManager.appendRecords(
timeout = config.requestTimeoutMs.toLong,
requiredAcks = -1,
internalTopicsAllowed = true,
origin = AppendOrigin.COORDINATOR,
entriesPerPartition = controlRecords,
requestLocal = requestLocal,
responseCallback = errors => {
errors.foreachEntry { (tp, partitionResponse) =>
addResultAndMaybeComplete(tp, partitionResponse.error)
}
}
)
}
}
}
// No log appends were written as all partitions had incorrect log format
// so we need to send the error response
if (skippedMarkers == markers.size)
requestHelper.sendResponseExemptThrottle(request, new WriteTxnMarkersResponse(errors))
}