in core/src/main/scala/kafka/server/KafkaApis.scala [358:489]
def handleOffsetCommitRequest(request: RequestChannel.Request): Unit = {
val header = request.header
val offsetCommitRequest = request.body[OffsetCommitRequest]
val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
// the callback for sending an offset commit response
def sendResponseCallback(commitStatus: Map[TopicPartition, Errors]): Unit = {
val combinedCommitStatus = commitStatus ++ unauthorizedTopicErrors ++ nonExistingTopicErrors
if (isDebugEnabled)
combinedCommitStatus.foreach { case (topicPartition, error) =>
if (error != Errors.NONE) {
debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " +
s"on partition $topicPartition failed due to ${error.exceptionName}")
}
}
sendResponseMaybeThrottle(request, requestThrottleMs =>
new OffsetCommitResponse(requestThrottleMs, combinedCommitStatus.asJava))
}
// reject the request if not authorized to the group
if (!authorize(request.context, READ, GROUP, offsetCommitRequest.data.groupId)) {
val error = Errors.GROUP_AUTHORIZATION_FAILED
val responseTopicList = OffsetCommitRequest.getErrorResponseTopics(
offsetCommitRequest.data.topics,
error)
sendResponseMaybeThrottle(request, requestThrottleMs => new OffsetCommitResponse(
new OffsetCommitResponseData()
.setTopics(responseTopicList)
.setThrottleTimeMs(requestThrottleMs)
))
} else if (offsetCommitRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion < KAFKA_2_3_IV0) {
// Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic
// until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard
// the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states.
val errorMap = new mutable.HashMap[TopicPartition, Errors]
for (topicData <- offsetCommitRequest.data.topics.asScala) {
for (partitionData <- topicData.partitions.asScala) {
val topicPartition = new TopicPartition(topicData.name, partitionData.partitionIndex)
errorMap += topicPartition -> Errors.UNSUPPORTED_VERSION
}
}
sendResponseCallback(errorMap.toMap)
} else {
val authorizedTopicRequestInfoBldr = immutable.Map.newBuilder[TopicPartition, OffsetCommitRequestData.OffsetCommitRequestPartition]
val topics = offsetCommitRequest.data.topics.asScala
val authorizedTopics = filterByAuthorized(request.context, READ, TOPIC, topics)(_.name)
for (topicData <- topics) {
for (partitionData <- topicData.partitions.asScala) {
val topicPartition = new TopicPartition(topicData.name, partitionData.partitionIndex)
if (!authorizedTopics.contains(topicData.name))
unauthorizedTopicErrors += (topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED)
else if (!metadataCache.contains(topicPartition))
nonExistingTopicErrors += (topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION)
else
authorizedTopicRequestInfoBldr += (topicPartition -> partitionData)
}
}
val authorizedTopicRequestInfo = authorizedTopicRequestInfoBldr.result()
if (authorizedTopicRequestInfo.isEmpty)
sendResponseCallback(Map.empty)
else if (header.apiVersion == 0) {
// for version 0 always store offsets to ZK
val responseInfo = authorizedTopicRequestInfo.map {
case (topicPartition, partitionData) =>
try {
if (partitionData.committedMetadata() != null
&& partitionData.committedMetadata().length > config.offsetMetadataMaxSize)
(topicPartition, Errors.OFFSET_METADATA_TOO_LARGE)
else {
zkClient.setOrCreateConsumerOffset(
offsetCommitRequest.data.groupId,
topicPartition,
partitionData.committedOffset)
(topicPartition, Errors.NONE)
}
} catch {
case e: Throwable => (topicPartition, Errors.forException(e))
}
}
sendResponseCallback(responseInfo)
} else {
// for version 1 and beyond store offsets in offset manager
// "default" expiration timestamp is now + retention (and retention may be overridden if v2)
// expire timestamp is computed differently for v1 and v2.
// - If v1 and no explicit commit timestamp is provided we treat it the same as v5.
// - If v1 and explicit retention time is provided we calculate expiration timestamp based on that
// - If v2/v3/v4 (no explicit commit timestamp) we treat it the same as v5.
// - For v5 and beyond there is no per partition expiration timestamp, so this field is no longer in effect
val currentTimestamp = time.milliseconds
val partitionData = authorizedTopicRequestInfo.map { case (k, partitionData) =>
val metadata = if (partitionData.committedMetadata == null)
OffsetAndMetadata.NoMetadata
else
partitionData.committedMetadata
val leaderEpochOpt = if (partitionData.committedLeaderEpoch == RecordBatch.NO_PARTITION_LEADER_EPOCH)
Optional.empty[Integer]
else
Optional.of[Integer](partitionData.committedLeaderEpoch)
k -> new OffsetAndMetadata(
offset = partitionData.committedOffset,
leaderEpoch = leaderEpochOpt,
metadata = metadata,
commitTimestamp = partitionData.commitTimestamp match {
case OffsetCommitRequest.DEFAULT_TIMESTAMP => currentTimestamp
case customTimestamp => customTimestamp
},
expireTimestamp = offsetCommitRequest.data.retentionTimeMs match {
case OffsetCommitRequest.DEFAULT_RETENTION_TIME => None
case retentionTime => Some(currentTimestamp + retentionTime)
}
)
}
// call coordinator to handle commit offset
groupCoordinator.handleCommitOffsets(
offsetCommitRequest.data.groupId,
offsetCommitRequest.data.memberId,
Option(offsetCommitRequest.data.groupInstanceId),
offsetCommitRequest.data.generationId,
partitionData,
sendResponseCallback)
}
}
}