in clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java [1172:1273]
public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> future) {
sensors.commitSensor.record(response.requestLatencyMs());
Set<String> unauthorizedTopics = new HashSet<>();
for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : commitResponse.data().topics()) {
for (OffsetCommitResponseData.OffsetCommitResponsePartition partition : topic.partitions()) {
TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex());
OffsetAndMetadata offsetAndMetadata = this.offsets.get(tp);
long offset = offsetAndMetadata.offset();
Errors error = Errors.forCode(partition.errorCode());
if (error == Errors.NONE) {
log.debug("Committed offset {} for partition {}", offset, tp);
} else {
if (error.exception() instanceof RetriableException) {
log.warn("Offset commit failed on partition {} at offset {}: {}", tp, offset, error.message());
} else {
log.error("Offset commit failed on partition {} at offset {}: {}", tp, offset, error.message());
}
if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId));
return;
} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
unauthorizedTopics.add(tp.topic());
} else if (error == Errors.OFFSET_METADATA_TOO_LARGE
|| error == Errors.INVALID_COMMIT_OFFSET_SIZE) {
// raise the error to the user
future.raise(error);
return;
} else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS
|| error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
// just retry
future.raise(error);
return;
} else if (error == Errors.COORDINATOR_NOT_AVAILABLE
|| error == Errors.NOT_COORDINATOR
|| error == Errors.REQUEST_TIMED_OUT) {
markCoordinatorUnknown();
future.raise(error);
return;
} else if (error == Errors.FENCED_INSTANCE_ID) {
log.info("OffsetCommit failed with {} due to group instance id {} fenced", sentGeneration, rebalanceConfig.groupInstanceId);
// if the generation has changed or we are not in rebalancing, do not raise the fatal error but rebalance-in-progress
if (generationUnchanged()) {
future.raise(error);
} else {
if (ConsumerCoordinator.this.state == MemberState.PREPARING_REBALANCE) {
future.raise(new RebalanceInProgressException("Offset commit cannot be completed since the " +
"consumer member's old generation is fenced by its group instance id, it is possible that " +
"this consumer has already participated another rebalance and got a new generation"));
} else {
future.raise(new CommitFailedException());
}
}
return;
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
/* Consumer should not try to commit offset in between join-group and sync-group,
* and hence on broker-side it is not expected to see a commit offset request
* during CompletingRebalance phase; if it ever happens then broker would return
* this error to indicate that we are still in the middle of a rebalance.
* In this case we would throw a RebalanceInProgressException,
* request re-join but do not reset generations. If the callers decide to retry they
* can go ahead and call poll to finish up the rebalance first, and then try commit again.
*/
requestRejoin();
future.raise(new RebalanceInProgressException("Offset commit cannot be completed since the " +
"consumer group is executing a rebalance at the moment. You can try completing the rebalance " +
"by calling poll() and then retry commit again"));
return;
} else if (error == Errors.UNKNOWN_MEMBER_ID
|| error == Errors.ILLEGAL_GENERATION) {
log.info("OffsetCommit failed with {}: {}", sentGeneration, error.message());
// only need to reset generation and re-join group if generation has not changed or we are not in rebalancing;
// otherwise only raise rebalance-in-progress error
if (!generationUnchanged() && ConsumerCoordinator.this.state == MemberState.PREPARING_REBALANCE) {
future.raise(new RebalanceInProgressException("Offset commit cannot be completed since the " +
"consumer member's generation is already stale, meaning it has already participated another rebalance and " +
"got a new generation. You can try completing the rebalance by calling poll() and then retry commit again"));
} else {
resetGenerationOnResponseError(ApiKeys.OFFSET_COMMIT, error);
future.raise(new CommitFailedException());
}
return;
} else {
future.raise(new KafkaException("Unexpected error in commit: " + error.message()));
return;
}
}
}
}
if (!unauthorizedTopics.isEmpty()) {
log.error("Not authorized to commit to topics {}", unauthorizedTopics);
future.raise(new TopicAuthorizationException(unauthorizedTopics));
} else {
future.complete(null);
}
}