in broker.go [1310:1380]
func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int16) error {
rb := &SaslHandshakeRequest{Mechanism: string(saslType), Version: version}
req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
buf, err := encode(req, b.metricRegistry)
if err != nil {
return err
}
requestTime := time.Now()
// Will be decremented in updateIncomingCommunicationMetrics (except error)
b.addRequestInFlightMetrics(1)
bytes, err := b.write(buf)
b.updateOutgoingCommunicationMetrics(bytes)
if err != nil {
b.addRequestInFlightMetrics(-1)
Logger.Printf("Failed to send SASL handshake %s: %s\n", b.addr, err.Error())
if errors.Is(err, io.EOF) {
return ErrSASLHandshakeSendEOF
} else if strings.HasPrefix(err.Error(), "tls:") {
// This is a workaround for the fact that the TLS subsystem
// generally returns opaque string literals (which are not stable
// between go versions) rather than documented / typed errors.
// A TLS-related error while sending a SASL handshake almost
// always indicates a failure in the underlying TLS handshake,
// and we need a way to surface that case to the user, otherwise
// it will fall back on "client has run out of available brokers
// to talk to", which is hard to troubleshoot.
return ErrBadTLSHandshake
}
return err
}
b.correlationID++
header := make([]byte, 8) // response header
_, err = b.readFull(header)
if err != nil {
b.addRequestInFlightMetrics(-1)
Logger.Printf("Failed to read SASL handshake header : %s\n", err.Error())
if err == io.ErrUnexpectedEOF {
return ErrSASLHandshakeReadEOF
}
return err
}
length := binary.BigEndian.Uint32(header[:4])
payload := make([]byte, length-4)
n, err := b.readFull(payload)
if err != nil {
b.addRequestInFlightMetrics(-1)
Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
return err
}
b.updateIncomingCommunicationMetrics(n+8, time.Since(requestTime))
res := &SaslHandshakeResponse{}
err = versionedDecode(payload, res, 0, b.metricRegistry)
if err != nil {
Logger.Printf("Failed to parse SASL handshake : %s\n", err.Error())
return err
}
if !errors.Is(res.Err, ErrNoError) {
Logger.Printf("Invalid SASL Mechanism : %s\n", res.Err.Error())
return res.Err
}
DebugLogger.Print("Completed pre-auth SASL handshake. Available mechanisms: ", res.EnabledMechanisms)
return nil
}