public void authenticate()

in clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java [240:326]


    public void authenticate() throws IOException {
        if (netOutBuffer != null && !flushNetOutBufferAndUpdateInterestOps())
            return;

        switch (saslState) {
            case SEND_APIVERSIONS_REQUEST:
                // Always use version 0 request since brokers treat requests with schema exceptions as GSSAPI tokens
                ApiVersionsRequest apiVersionsRequest = new ApiVersionsRequest.Builder().build((short) 0);
                send(apiVersionsRequest.toSend(nextRequestHeader(ApiKeys.API_VERSIONS, apiVersionsRequest.version())));
                setSaslState(SaslState.RECEIVE_APIVERSIONS_RESPONSE);
                break;
            case RECEIVE_APIVERSIONS_RESPONSE:
                ApiVersionsResponse apiVersionsResponse = (ApiVersionsResponse) receiveKafkaResponse();
                if (apiVersionsResponse == null)
                    break;
                else {
                    setSaslAuthenticateAndHandshakeVersions(apiVersionsResponse);
                    reauthInfo.apiVersionsResponseReceivedFromBroker = apiVersionsResponse;
                    setSaslState(SaslState.SEND_HANDSHAKE_REQUEST);
                    // Fall through to send handshake request with the latest supported version
                }
            case SEND_HANDSHAKE_REQUEST:
                sendHandshakeRequest(saslHandshakeVersion);
                setSaslState(SaslState.RECEIVE_HANDSHAKE_RESPONSE);
                break;
            case RECEIVE_HANDSHAKE_RESPONSE:
                SaslHandshakeResponse handshakeResponse = (SaslHandshakeResponse) receiveKafkaResponse();
                if (handshakeResponse == null)
                    break;
                else {
                    handleSaslHandshakeResponse(handshakeResponse);
                    setSaslState(SaslState.INITIAL);
                    // Fall through and start SASL authentication using the configured client mechanism
                }
            case INITIAL:
                sendInitialToken();
                setSaslState(SaslState.INTERMEDIATE);
                break;
            case REAUTH_PROCESS_ORIG_APIVERSIONS_RESPONSE:
                setSaslAuthenticateAndHandshakeVersions(reauthInfo.apiVersionsResponseFromOriginalAuthentication);
                setSaslState(SaslState.REAUTH_SEND_HANDSHAKE_REQUEST); // Will set immediately
                // Fall through to send handshake request with the latest supported version
            case REAUTH_SEND_HANDSHAKE_REQUEST:
                sendHandshakeRequest(saslHandshakeVersion);
                setSaslState(SaslState.REAUTH_RECEIVE_HANDSHAKE_OR_OTHER_RESPONSE);
                break;
            case REAUTH_RECEIVE_HANDSHAKE_OR_OTHER_RESPONSE:
                handshakeResponse = (SaslHandshakeResponse) receiveKafkaResponse();
                if (handshakeResponse == null)
                    break;
                handleSaslHandshakeResponse(handshakeResponse);
                setSaslState(SaslState.REAUTH_INITIAL); // Will set immediately
                /*
                 * Fall through and start SASL authentication using the configured client
                 * mechanism. Note that we have to either fall through or add a loop to enter
                 * the switch statement again. We will fall through to avoid adding the loop and
                 * therefore minimize the changes to authentication-related code due to the
                 * changes related to re-authentication.
                 */
            case REAUTH_INITIAL:
                sendInitialToken();
                setSaslState(SaslState.INTERMEDIATE);
                break;
            case INTERMEDIATE:
                byte[] serverToken = receiveToken();
                boolean noResponsesPending = serverToken != null && !sendSaslClientToken(serverToken, false);
                // For versions without SASL_AUTHENTICATE header, SASL exchange may be complete after a token is sent to server.
                // For versions with SASL_AUTHENTICATE header, server always sends a response to each SASL_AUTHENTICATE request.
                if (saslClient.isComplete()) {
                    if (saslAuthenticateVersion == DISABLE_KAFKA_SASL_AUTHENTICATE_HEADER || noResponsesPending)
                        setSaslState(SaslState.COMPLETE);
                    else
                        setSaslState(SaslState.CLIENT_COMPLETE);
                }
                break;
            case CLIENT_COMPLETE:
                byte[] serverResponse = receiveToken();
                if (serverResponse != null)
                    setSaslState(SaslState.COMPLETE);
                break;
            case COMPLETE:
                break;
            case FAILED:
                // Should never get here since exception would have been propagated earlier
                throw new IllegalStateException("SASL handshake has already failed");
        }
    }