in core/src/main/scala/kafka/network/RequestChannel.scala [55:297]
private def isRequestLoggingEnabled(header: RequestHeader): Boolean = requestLogger.underlying.isDebugEnabled ||
(requestLogger.underlying.isInfoEnabled && header.isApiVersionDeprecated())
sealed trait BaseRequest
case object ShutdownRequest extends BaseRequest
case object WakeupRequest extends BaseRequest
case class CallbackRequest(fun: RequestLocal => Unit,
originalRequest: Request) extends BaseRequest
class Request(val processor: Int,
val context: RequestContext,
val startTimeNanos: Long,
val memoryPool: MemoryPool,
@volatile var buffer: ByteBuffer,
metrics: RequestChannelMetrics,
val envelope: Option[RequestChannel.Request] = None) extends BaseRequest {
// These need to be volatile because the readers are in the network thread and the writers are in the request
// handler threads or the purgatory threads
@volatile var requestDequeueTimeNanos: Long = -1L
@volatile var apiLocalCompleteTimeNanos: Long = -1L
@volatile var responseCompleteTimeNanos: Long = -1L
@volatile var responseDequeueTimeNanos: Long = -1L
@volatile var messageConversionsTimeNanos: Long = 0L
@volatile var apiThrottleTimeMs: Long = 0L
@volatile var temporaryMemoryBytes: Long = 0L
@volatile var recordNetworkThreadTimeCallback: Option[java.util.function.Consumer[java.lang.Long]] = None
@volatile var callbackRequestDequeueTimeNanos: Option[Long] = None
@volatile var callbackRequestCompleteTimeNanos: Option[Long] = None
val session: Session = new Session(context.principal, context.clientAddress)
private val bodyAndSize: RequestAndSize = context.parseRequest(buffer)
// This is constructed on creation of a Request so that the JSON representation is computed before the request is
// processed by the api layer. Otherwise, a ProduceRequest can occur without its data (ie. it goes into purgatory).
val requestLog: Option[JsonNode] =
if (RequestChannel.isRequestLoggingEnabled(context.header)) Some(RequestConvertToJson.request(loggableRequest))
else None
def header: RequestHeader = context.header
private def sizeOfBodyInBytes: Int = bodyAndSize.size
def sizeInBytes: Int = header.size + sizeOfBodyInBytes
//most request types are parsed entirely into objects at this point. for those we can release the underlying buffer.
//some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference
//to the buffer. for those requests we cannot release the buffer early, but only when request processing is done.
if (!header.apiKey.requiresDelayedAllocation) {
releaseBuffer()
}
def isForwarded: Boolean = envelope.isDefined
private def shouldReturnNotController(response: AbstractResponse): Boolean = {
response match {
case _: DescribeQuorumResponse => response.errorCounts.containsKey(Errors.NOT_LEADER_OR_FOLLOWER)
case _ => response.errorCounts.containsKey(Errors.NOT_CONTROLLER)
}
}
def buildResponseSend(abstractResponse: AbstractResponse): Send = {
envelope match {
case Some(request) =>
val envelopeResponse = if (shouldReturnNotController(abstractResponse)) {
// Since it's a NOT_CONTROLLER error response, we need to make envelope response with NOT_CONTROLLER error
// to notify the requester (i.e. NodeToControllerRequestThread) to update active controller
new EnvelopeResponse(new EnvelopeResponseData()
.setErrorCode(Errors.NOT_CONTROLLER.code()))
} else {
val responseBytes = context.buildResponseEnvelopePayload(abstractResponse)
new EnvelopeResponse(responseBytes, Errors.NONE)
}
request.context.buildResponseSend(envelopeResponse)
case None =>
context.buildResponseSend(abstractResponse)
}
}
def responseNode(response: AbstractResponse): Option[JsonNode] = {
if (RequestChannel.isRequestLoggingEnabled(context.header))
Some(RequestConvertToJson.response(response, context.apiVersion))
else
None
}
def headerForLoggingOrThrottling(): RequestHeader = {
envelope match {
case Some(request) =>
request.context.header
case None =>
context.header
}
}
def requestDesc(details: Boolean): String = {
val forwardDescription = envelope.map { request =>
s"Forwarded request: ${request.context} "
}.getOrElse("")
s"$forwardDescription$header -- ${loggableRequest.toString(details)}"
}
def body[T <: AbstractRequest](implicit classTag: ClassTag[T]): T = {
bodyAndSize.request match {
case r: T => r
case r =>
throw new ClassCastException(s"Expected request with type ${classTag.runtimeClass}, but found ${r.getClass}")
}
}
def loggableRequest: AbstractRequest = {
bodyAndSize.request match {
case alterConfigs: AlterConfigsRequest =>
val newData = alterConfigs.data().duplicate()
newData.resources().forEach(resource => {
val resourceType = ConfigResource.Type.forId(resource.resourceType())
resource.configs().forEach(config => {
config.setValue(KafkaConfig.loggableValue(resourceType, config.name(), config.value()))
})
})
new AlterConfigsRequest(newData, alterConfigs.version())
case alterConfigs: IncrementalAlterConfigsRequest =>
val newData = alterConfigs.data().duplicate()
newData.resources().forEach(resource => {
val resourceType = ConfigResource.Type.forId(resource.resourceType())
resource.configs().forEach(config => {
config.setValue(KafkaConfig.loggableValue(resourceType, config.name(), config.value()))
})
})
new IncrementalAlterConfigsRequest.Builder(newData).build(alterConfigs.version())
case _ =>
bodyAndSize.request
}
}
trace(s"Processor $processor received request: ${requestDesc(true)}")
def requestThreadTimeNanos: Long = {
if (apiLocalCompleteTimeNanos == -1L) apiLocalCompleteTimeNanos = Time.SYSTEM.nanoseconds
math.max(apiLocalCompleteTimeNanos - requestDequeueTimeNanos, 0L)
}
def updateRequestMetrics(networkThreadTimeNanos: Long, response: Response): Unit = {
val endTimeNanos = Time.SYSTEM.nanoseconds
/**
* Converts nanos to millis with micros precision as additional decimal places in the request log have low
* signal to noise ratio. When it comes to metrics, there is little difference either way as we round the value
* to the nearest long.
*/
def nanosToMs(nanos: Long): Double = {
val positiveNanos = math.max(nanos, 0)
TimeUnit.NANOSECONDS.toMicros(positiveNanos).toDouble / TimeUnit.MILLISECONDS.toMicros(1)
}
val requestQueueTimeMs = nanosToMs(requestDequeueTimeNanos - startTimeNanos)
val callbackRequestTimeNanos = callbackRequestCompleteTimeNanos.getOrElse(0L) - callbackRequestDequeueTimeNanos.getOrElse(0L)
val apiLocalTimeMs = nanosToMs(apiLocalCompleteTimeNanos - requestDequeueTimeNanos + callbackRequestTimeNanos)
val apiRemoteTimeMs = nanosToMs(responseCompleteTimeNanos - apiLocalCompleteTimeNanos - callbackRequestTimeNanos)
val responseQueueTimeMs = nanosToMs(responseDequeueTimeNanos - responseCompleteTimeNanos)
val responseSendTimeMs = nanosToMs(endTimeNanos - responseDequeueTimeNanos)
val messageConversionsTimeMs = nanosToMs(messageConversionsTimeNanos)
val totalTimeMs = nanosToMs(endTimeNanos - startTimeNanos)
val overrideMetricNames =
if (header.apiKey == ApiKeys.FETCH) {
val specifiedMetricName =
if (body[FetchRequest].isFromFollower) RequestMetrics.FOLLOW_FETCH_METRIC_NAME
else RequestMetrics.CONSUMER_FETCH_METRIC_NAME
Seq(specifiedMetricName, header.apiKey.name)
} else if (header.apiKey == ApiKeys.ADD_PARTITIONS_TO_TXN && body[AddPartitionsToTxnRequest].allVerifyOnlyRequest) {
Seq(RequestMetrics.VERIFY_PARTITIONS_IN_TXN_METRIC_NAME)
} else {
Seq(header.apiKey.name)
}
overrideMetricNames.foreach { metricName =>
val m = metrics(metricName)
m.requestRate(header.apiVersion).mark()
m.deprecatedRequestRate(header.apiKey, header.apiVersion, context.clientInformation).ifPresent(_.mark())
m.requestQueueTimeHist.update(Math.round(requestQueueTimeMs))
m.localTimeHist.update(Math.round(apiLocalTimeMs))
m.remoteTimeHist.update(Math.round(apiRemoteTimeMs))
m.throttleTimeHist.update(apiThrottleTimeMs)
m.responseQueueTimeHist.update(Math.round(responseQueueTimeMs))
m.responseSendTimeHist.update(Math.round(responseSendTimeMs))
m.totalTimeHist.update(Math.round(totalTimeMs))
m.requestBytesHist.update(sizeOfBodyInBytes)
m.messageConversionsTimeHist.ifPresent(_.update(Math.round(messageConversionsTimeMs)))
m.tempMemoryBytesHist.ifPresent(_.update(temporaryMemoryBytes))
}
// Records network handler thread usage. This is included towards the request quota for the
// user/client. Throttling is only performed when request handler thread usage
// is recorded, just before responses are queued for delivery.
// The time recorded here is the time spent on the network thread for receiving this request
// and sending the response. Note that for the first request on a connection, the time includes
// the total time spent on authentication, which may be significant for SASL/SSL.
recordNetworkThreadTimeCallback.foreach(record => record.accept(networkThreadTimeNanos))
if (isRequestLoggingEnabled(header)) {
val desc = RequestConvertToJson.requestDescMetrics(header, requestLog.toJava, response.responseLog.toJava,
context, session, isForwarded,
totalTimeMs, requestQueueTimeMs, apiLocalTimeMs,
apiRemoteTimeMs, apiThrottleTimeMs, responseQueueTimeMs,
responseSendTimeMs, temporaryMemoryBytes,
messageConversionsTimeMs)
val logPrefix = "Completed request:{}"
// log deprecated apis at `info` level to allow them to be selectively enabled
if (header.isApiVersionDeprecated())
requestLogger.info(logPrefix, desc)
else
requestLogger.debug(logPrefix, desc)
}
}
def releaseBuffer(): Unit = {
envelope match {
case Some(request) =>
request.releaseBuffer()
case None =>
if (buffer != null) {
memoryPool.release(buffer)
buffer = null
}
}
}
def setRecordNetworkThreadTimeCallback(callback: java.util.function.Consumer[java.lang.Long]): Unit = {
recordNetworkThreadTimeCallback = Some(callback)
}
override def toString: String = s"Request(processor=$processor, " +
s"connectionId=${context.connectionId}, " +
s"session=$session, " +
s"listenerName=${context.listenerName}, " +
s"securityProtocol=${context.securityProtocol}, " +
s"buffer=$buffer, " +
s"envelope=$envelope)"
}