in clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java [240:326]
public void authenticate() throws IOException {
if (netOutBuffer != null && !flushNetOutBufferAndUpdateInterestOps())
return;
switch (saslState) {
case SEND_APIVERSIONS_REQUEST:
// Always use version 0 request since brokers treat requests with schema exceptions as GSSAPI tokens
ApiVersionsRequest apiVersionsRequest = new ApiVersionsRequest.Builder().build((short) 0);
send(apiVersionsRequest.toSend(nextRequestHeader(ApiKeys.API_VERSIONS, apiVersionsRequest.version())));
setSaslState(SaslState.RECEIVE_APIVERSIONS_RESPONSE);
break;
case RECEIVE_APIVERSIONS_RESPONSE:
ApiVersionsResponse apiVersionsResponse = (ApiVersionsResponse) receiveKafkaResponse();
if (apiVersionsResponse == null)
break;
else {
setSaslAuthenticateAndHandshakeVersions(apiVersionsResponse);
reauthInfo.apiVersionsResponseReceivedFromBroker = apiVersionsResponse;
setSaslState(SaslState.SEND_HANDSHAKE_REQUEST);
// Fall through to send handshake request with the latest supported version
}
case SEND_HANDSHAKE_REQUEST:
sendHandshakeRequest(saslHandshakeVersion);
setSaslState(SaslState.RECEIVE_HANDSHAKE_RESPONSE);
break;
case RECEIVE_HANDSHAKE_RESPONSE:
SaslHandshakeResponse handshakeResponse = (SaslHandshakeResponse) receiveKafkaResponse();
if (handshakeResponse == null)
break;
else {
handleSaslHandshakeResponse(handshakeResponse);
setSaslState(SaslState.INITIAL);
// Fall through and start SASL authentication using the configured client mechanism
}
case INITIAL:
sendInitialToken();
setSaslState(SaslState.INTERMEDIATE);
break;
case REAUTH_PROCESS_ORIG_APIVERSIONS_RESPONSE:
setSaslAuthenticateAndHandshakeVersions(reauthInfo.apiVersionsResponseFromOriginalAuthentication);
setSaslState(SaslState.REAUTH_SEND_HANDSHAKE_REQUEST); // Will set immediately
// Fall through to send handshake request with the latest supported version
case REAUTH_SEND_HANDSHAKE_REQUEST:
sendHandshakeRequest(saslHandshakeVersion);
setSaslState(SaslState.REAUTH_RECEIVE_HANDSHAKE_OR_OTHER_RESPONSE);
break;
case REAUTH_RECEIVE_HANDSHAKE_OR_OTHER_RESPONSE:
handshakeResponse = (SaslHandshakeResponse) receiveKafkaResponse();
if (handshakeResponse == null)
break;
handleSaslHandshakeResponse(handshakeResponse);
setSaslState(SaslState.REAUTH_INITIAL); // Will set immediately
/*
* Fall through and start SASL authentication using the configured client
* mechanism. Note that we have to either fall through or add a loop to enter
* the switch statement again. We will fall through to avoid adding the loop and
* therefore minimize the changes to authentication-related code due to the
* changes related to re-authentication.
*/
case REAUTH_INITIAL:
sendInitialToken();
setSaslState(SaslState.INTERMEDIATE);
break;
case INTERMEDIATE:
byte[] serverToken = receiveToken();
boolean noResponsesPending = serverToken != null && !sendSaslClientToken(serverToken, false);
// For versions without SASL_AUTHENTICATE header, SASL exchange may be complete after a token is sent to server.
// For versions with SASL_AUTHENTICATE header, server always sends a response to each SASL_AUTHENTICATE request.
if (saslClient.isComplete()) {
if (saslAuthenticateVersion == DISABLE_KAFKA_SASL_AUTHENTICATE_HEADER || noResponsesPending)
setSaslState(SaslState.COMPLETE);
else
setSaslState(SaslState.CLIENT_COMPLETE);
}
break;
case CLIENT_COMPLETE:
byte[] serverResponse = receiveToken();
if (serverResponse != null)
setSaslState(SaslState.COMPLETE);
break;
case COMPLETE:
break;
case FAILED:
// Should never get here since exception would have been propagated earlier
throw new IllegalStateException("SASL handshake has already failed");
}
}