in broker.go [1249:1308]
func (b *Broker) authenticateViaSASLv1() error {
metricRegistry := b.metricRegistry
if b.conf.Net.SASL.Handshake {
handshakeRequest := &SaslHandshakeRequest{Mechanism: string(b.conf.Net.SASL.Mechanism), Version: b.conf.Net.SASL.Version}
handshakeResponse := new(SaslHandshakeResponse)
prom := makeResponsePromise(handshakeResponse.version())
handshakeErr := b.sendInternal(handshakeRequest, prom)
if handshakeErr != nil {
Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
return handshakeErr
}
handshakeErr = handleResponsePromise(handshakeRequest, handshakeResponse, prom, metricRegistry)
if handshakeErr != nil {
Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
return handshakeErr
}
if !errors.Is(handshakeResponse.Err, ErrNoError) {
return handshakeResponse.Err
}
}
authSendReceiver := func(authBytes []byte) (*SaslAuthenticateResponse, error) {
authenticateRequest := b.createSaslAuthenticateRequest(authBytes)
authenticateResponse := new(SaslAuthenticateResponse)
prom := makeResponsePromise(authenticateResponse.version())
authErr := b.sendInternal(authenticateRequest, prom)
if authErr != nil {
Logger.Printf("Error while performing SASL Auth %s\n", b.addr)
return nil, authErr
}
authErr = handleResponsePromise(authenticateRequest, authenticateResponse, prom, metricRegistry)
if authErr != nil {
Logger.Printf("Error while performing SASL Auth %s\n", b.addr)
return nil, authErr
}
if !errors.Is(authenticateResponse.Err, ErrNoError) {
var err error = authenticateResponse.Err
if authenticateResponse.ErrorMessage != nil {
err = Wrap(authenticateResponse.Err, errors.New(*authenticateResponse.ErrorMessage))
}
return nil, err
}
b.computeSaslSessionLifetime(authenticateResponse)
return authenticateResponse, nil
}
switch b.conf.Net.SASL.Mechanism {
case SASLTypeOAuth:
provider := b.conf.Net.SASL.TokenProvider
return b.sendAndReceiveSASLOAuth(authSendReceiver, provider)
case SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512:
return b.sendAndReceiveSASLSCRAMv1(authSendReceiver, b.conf.Net.SASL.SCRAMClientGeneratorFunc())
default:
return b.sendAndReceiveSASLPlainAuthV1(authSendReceiver)
}
}