func()

in broker.go [1249:1308]


func (b *Broker) authenticateViaSASLv1() error {
	metricRegistry := b.metricRegistry
	if b.conf.Net.SASL.Handshake {
		handshakeRequest := &SaslHandshakeRequest{Mechanism: string(b.conf.Net.SASL.Mechanism), Version: b.conf.Net.SASL.Version}
		handshakeResponse := new(SaslHandshakeResponse)
		prom := makeResponsePromise(handshakeResponse.version())

		handshakeErr := b.sendInternal(handshakeRequest, prom)
		if handshakeErr != nil {
			Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
			return handshakeErr
		}
		handshakeErr = handleResponsePromise(handshakeRequest, handshakeResponse, prom, metricRegistry)
		if handshakeErr != nil {
			Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
			return handshakeErr
		}

		if !errors.Is(handshakeResponse.Err, ErrNoError) {
			return handshakeResponse.Err
		}
	}

	authSendReceiver := func(authBytes []byte) (*SaslAuthenticateResponse, error) {
		authenticateRequest := b.createSaslAuthenticateRequest(authBytes)
		authenticateResponse := new(SaslAuthenticateResponse)
		prom := makeResponsePromise(authenticateResponse.version())
		authErr := b.sendInternal(authenticateRequest, prom)
		if authErr != nil {
			Logger.Printf("Error while performing SASL Auth %s\n", b.addr)
			return nil, authErr
		}
		authErr = handleResponsePromise(authenticateRequest, authenticateResponse, prom, metricRegistry)
		if authErr != nil {
			Logger.Printf("Error while performing SASL Auth %s\n", b.addr)
			return nil, authErr
		}

		if !errors.Is(authenticateResponse.Err, ErrNoError) {
			var err error = authenticateResponse.Err
			if authenticateResponse.ErrorMessage != nil {
				err = Wrap(authenticateResponse.Err, errors.New(*authenticateResponse.ErrorMessage))
			}
			return nil, err
		}

		b.computeSaslSessionLifetime(authenticateResponse)
		return authenticateResponse, nil
	}

	switch b.conf.Net.SASL.Mechanism {
	case SASLTypeOAuth:
		provider := b.conf.Net.SASL.TokenProvider
		return b.sendAndReceiveSASLOAuth(authSendReceiver, provider)
	case SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512:
		return b.sendAndReceiveSASLSCRAMv1(authSendReceiver, b.conf.Net.SASL.SCRAMClientGeneratorFunc())
	default:
		return b.sendAndReceiveSASLPlainAuthV1(authSendReceiver)
	}
}