in core/src/main/scala/kafka/server/KafkaApis.scala [387:548]
def handleProduceRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
val produceRequest = request.body[ProduceRequest]
if (RequestUtils.hasTransactionalRecords(produceRequest)) {
val isAuthorizedTransactional = produceRequest.transactionalId != null &&
authHelper.authorize(request.context, WRITE, TRANSACTIONAL_ID, produceRequest.transactionalId)
if (!isAuthorizedTransactional) {
requestHelper.sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
return
}
}
val unauthorizedTopicResponses = mutable.Map[TopicIdPartition, PartitionResponse]()
val nonExistingTopicResponses = mutable.Map[TopicIdPartition, PartitionResponse]()
val invalidRequestResponses = mutable.Map[TopicIdPartition, PartitionResponse]()
val authorizedRequestInfo = mutable.Map[TopicIdPartition, MemoryRecords]()
val topicIdToPartitionData = new mutable.ArrayBuffer[(TopicIdPartition, ProduceRequestData.PartitionProduceData)]
produceRequest.data.topicData.forEach { topic =>
topic.partitionData.forEach { partition =>
val (topicName, topicId) = if (topic.topicId().equals(Uuid.ZERO_UUID)) {
(topic.name(), metadataCache.getTopicId(topic.name()))
} else {
(metadataCache.getTopicName(topic.topicId).orElse(topic.name), topic.topicId())
}
val topicPartition = new TopicPartition(topicName, partition.index())
if (topicName.isEmpty)
nonExistingTopicResponses += new TopicIdPartition(topicId, topicPartition) -> new PartitionResponse(Errors.UNKNOWN_TOPIC_ID)
else if (!metadataCache.contains(topicPartition))
nonExistingTopicResponses += new TopicIdPartition(topicId, topicPartition) -> new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)
else
topicIdToPartitionData += new TopicIdPartition(topicId, topicPartition) -> partition
}
}
// cache the result to avoid redundant authorization calls
val authorizedTopics = authHelper.filterByAuthorized(request.context, WRITE, TOPIC, topicIdToPartitionData)(_._1.topic)
topicIdToPartitionData.foreach { case (topicIdPartition, partition) =>
// This caller assumes the type is MemoryRecords and that is true on current serialization
// We cast the type to avoid causing big change to code base.
// https://issues.apache.org/jira/browse/KAFKA-10698
val memoryRecords = partition.records.asInstanceOf[MemoryRecords]
if (!authorizedTopics.contains(topicIdPartition.topic))
unauthorizedTopicResponses += topicIdPartition -> new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED)
else
try {
ProduceRequest.validateRecords(request.header.apiVersion, memoryRecords)
authorizedRequestInfo += (topicIdPartition -> memoryRecords)
} catch {
case e: ApiException =>
invalidRequestResponses += topicIdPartition -> new PartitionResponse(Errors.forException(e))
}
}
// the callback for sending a produce response
// The construction of ProduceResponse is able to accept auto-generated protocol data so
// KafkaApis#handleProduceRequest should apply auto-generated protocol to avoid extra conversion.
// https://issues.apache.org/jira/browse/KAFKA-10730
@nowarn("cat=deprecation")
def sendResponseCallback(responseStatus: Map[TopicIdPartition, PartitionResponse]): Unit = {
val mergedResponseStatus = responseStatus ++ unauthorizedTopicResponses ++ nonExistingTopicResponses ++ invalidRequestResponses
var errorInResponse = false
val nodeEndpoints = new mutable.HashMap[Int, Node]
mergedResponseStatus.foreachEntry { (topicIdPartition, status) =>
if (status.error != Errors.NONE) {
errorInResponse = true
debug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(
request.header.correlationId,
request.header.clientId,
topicIdPartition,
status.error.exceptionName))
if (request.header.apiVersion >= 10) {
status.error match {
case Errors.NOT_LEADER_OR_FOLLOWER =>
val leaderNode = getCurrentLeader(topicIdPartition.topicPartition(), request.context.listenerName)
leaderNode.node.foreach { node =>
nodeEndpoints.put(node.id(), node)
}
status.currentLeader
.setLeaderId(leaderNode.leaderId)
.setLeaderEpoch(leaderNode.leaderEpoch)
case _ =>
}
}
}
}
// Record both bandwidth and request quota-specific values and throttle by muting the channel if any of the quotas
// have been violated. If both quotas have been violated, use the max throttle time between the two quotas. Note
// that the request quota is not enforced if acks == 0.
val timeMs = time.milliseconds()
val requestSize = request.sizeInBytes
val bandwidthThrottleTimeMs = quotas.produce.maybeRecordAndGetThrottleTimeMs(request, requestSize, timeMs)
val requestThrottleTimeMs =
if (produceRequest.acks == 0) 0
else quotas.request.maybeRecordAndGetThrottleTimeMs(request, timeMs)
val maxThrottleTimeMs = Math.max(bandwidthThrottleTimeMs, requestThrottleTimeMs)
if (maxThrottleTimeMs > 0) {
request.apiThrottleTimeMs = maxThrottleTimeMs
if (bandwidthThrottleTimeMs > requestThrottleTimeMs) {
requestHelper.throttle(quotas.produce, request, bandwidthThrottleTimeMs)
} else {
requestHelper.throttle(quotas.request, request, requestThrottleTimeMs)
}
}
// Send the response immediately. In case of throttling, the channel has already been muted.
if (produceRequest.acks == 0) {
// no operation needed if producer request.required.acks = 0; however, if there is any error in handling
// the request, since no response is expected by the producer, the server will close socket server so that
// the producer client will know that some error has happened and will refresh its metadata
if (errorInResponse) {
val exceptionsSummary = mergedResponseStatus.map { case (topicPartition, status) =>
topicPartition -> status.error.exceptionName
}.mkString(", ")
info(
s"Closing connection due to error during produce request with correlation id ${request.header.correlationId} " +
s"from client id ${request.header.clientId} with ack=0\n" +
s"Topic and partition to exceptions: $exceptionsSummary"
)
requestChannel.closeConnection(request, new ProduceResponse(mergedResponseStatus.asJava).errorCounts)
} else {
// Note that although request throttling is exempt for acks == 0, the channel may be throttled due to
// bandwidth quota violation.
requestHelper.sendNoOpResponseExemptThrottle(request)
}
} else {
requestChannel.sendResponse(request, new ProduceResponse(mergedResponseStatus.asJava, maxThrottleTimeMs, nodeEndpoints.values.toList.asJava), None)
}
}
def processingStatsCallback(processingStats: ProduceResponseStats): Unit = {
processingStats.foreachEntry { (topicIdPartition, info) =>
updateRecordConversionStats(request, topicIdPartition.topicPartition(), info)
}
}
if (authorizedRequestInfo.isEmpty)
sendResponseCallback(Map.empty)
else {
val internalTopicsAllowed = request.header.clientId == "__admin_client"
val transactionSupportedOperation = AddPartitionsToTxnManager.produceRequestVersionToTransactionSupportedOperation(request.header.apiVersion())
// call the replica manager to append messages to the replicas
replicaManager.handleProduceAppend(
timeout = produceRequest.timeout.toLong,
requiredAcks = produceRequest.acks,
internalTopicsAllowed = internalTopicsAllowed,
transactionalId = produceRequest.transactionalId,
entriesPerPartition = authorizedRequestInfo,
responseCallback = sendResponseCallback,
recordValidationStatsCallback = processingStatsCallback,
requestLocal = requestLocal,
transactionSupportedOperation = transactionSupportedOperation)
// if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected;
// hence we clear its data here in order to let GC reclaim its memory since it is already appended to log
produceRequest.clearPartitionRecords()
}
}