in clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java [562:657]
public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
Errors error = joinResponse.error();
if (error == Errors.NONE) {
if (isProtocolTypeInconsistent(joinResponse.data().protocolType())) {
log.error("JoinGroup failed due to inconsistent Protocol Type, received {} but expected {}",
joinResponse.data().protocolType(), protocolType());
future.raise(Errors.INCONSISTENT_GROUP_PROTOCOL);
} else {
log.debug("Received successful JoinGroup response: {}", joinResponse);
sensors.joinSensor.record(response.requestLatencyMs());
synchronized (AbstractCoordinator.this) {
if (state != MemberState.PREPARING_REBALANCE) {
// if the consumer was woken up before a rebalance completes, we may have already left
// the group. In this case, we do not want to continue with the sync group.
future.raise(new UnjoinedGroupException());
} else {
state = MemberState.COMPLETING_REBALANCE;
// we only need to enable heartbeat thread whenever we transit to
// COMPLETING_REBALANCE state since we always transit from this state to STABLE
if (heartbeatThread != null)
heartbeatThread.enable();
AbstractCoordinator.this.generation = new Generation(
joinResponse.data().generationId(),
joinResponse.data().memberId(), joinResponse.data().protocolName());
log.info("Successfully joined group with generation {}", AbstractCoordinator.this.generation);
if (joinResponse.isLeader()) {
onJoinLeader(joinResponse).chain(future);
} else {
onJoinFollower().chain(future);
}
}
}
}
} else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
log.debug("Attempt to join group rejected since coordinator {} is loading the group.", coordinator());
// backoff and retry
future.raise(error);
} else if (error == Errors.UNKNOWN_MEMBER_ID) {
log.debug("Attempt to join group failed due to unknown member id with {}.", sentGeneration);
// only need to reset the member id if generation has not been changed,
// then retry immediately
if (generationUnchanged())
resetGenerationOnResponseError(ApiKeys.JOIN_GROUP, error);
future.raise(error);
} else if (error == Errors.COORDINATOR_NOT_AVAILABLE
|| error == Errors.NOT_COORDINATOR) {
// re-discover the coordinator and retry with backoff
markCoordinatorUnknown();
log.debug("Attempt to join group failed due to obsolete coordinator information: {}", error.message());
future.raise(error);
} else if (error == Errors.FENCED_INSTANCE_ID) {
// for join-group request, even if the generation has changed we would not expect the instance id
// gets fenced, and hence we always treat this as a fatal error
log.error("Attempt to join group with generation {} failed because the group instance id {} has been fenced by another instance",
rebalanceConfig.groupInstanceId, sentGeneration);
future.raise(error);
} else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL
|| error == Errors.INVALID_SESSION_TIMEOUT
|| error == Errors.INVALID_GROUP_ID
|| error == Errors.GROUP_AUTHORIZATION_FAILED
|| error == Errors.GROUP_MAX_SIZE_REACHED) {
// log the error and re-throw the exception
log.error("Attempt to join group failed due to fatal error: {}", error.message());
if (error == Errors.GROUP_MAX_SIZE_REACHED) {
future.raise(new GroupMaxSizeReachedException("Consumer group " + rebalanceConfig.groupId +
" already has the configured maximum number of members."));
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId));
} else {
future.raise(error);
}
} else if (error == Errors.UNSUPPORTED_VERSION) {
log.error("Attempt to join group failed due to unsupported version error. Please unset field group.instance.id and retry" +
" to see if the problem resolves");
future.raise(error);
} else if (error == Errors.MEMBER_ID_REQUIRED) {
// Broker requires a concrete member id to be allowed to join the group. Update member id
// and send another join group request in next cycle.
String memberId = joinResponse.data().memberId();
log.debug("Attempt to join group returned {} error. Will set the member id as {} and then rejoin", error, memberId);
synchronized (AbstractCoordinator.this) {
AbstractCoordinator.this.generation = new Generation(OffsetCommitRequest.DEFAULT_GENERATION_ID, memberId, null);
}
future.raise(error);
} else {
// unexpected error, throw the exception
log.error("Attempt to join group failed due to unexpected error: {}", error.message());
future.raise(new KafkaException("Unexpected error in join group response: " + error.message()));
}
}