func()

in broker.go [1310:1380]


func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int16) error {
	rb := &SaslHandshakeRequest{Mechanism: string(saslType), Version: version}

	req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
	buf, err := encode(req, b.metricRegistry)
	if err != nil {
		return err
	}

	requestTime := time.Now()
	// Will be decremented in updateIncomingCommunicationMetrics (except error)
	b.addRequestInFlightMetrics(1)
	bytes, err := b.write(buf)
	b.updateOutgoingCommunicationMetrics(bytes)
	if err != nil {
		b.addRequestInFlightMetrics(-1)
		Logger.Printf("Failed to send SASL handshake %s: %s\n", b.addr, err.Error())
		if errors.Is(err, io.EOF) {
			return ErrSASLHandshakeSendEOF
		} else if strings.HasPrefix(err.Error(), "tls:") {
			// This is a workaround for the fact that the TLS subsystem
			// generally returns opaque string literals (which are not stable
			// between go versions) rather than documented / typed errors.
			// A TLS-related error while sending a SASL handshake almost
			// always indicates a failure in the underlying TLS handshake,
			// and we need a way to surface that case to the user, otherwise
			// it will fall back on "client has run out of available brokers
			// to talk to", which is hard to troubleshoot.
			return ErrBadTLSHandshake
		}
		return err
	}
	b.correlationID++

	header := make([]byte, 8) // response header
	_, err = b.readFull(header)
	if err != nil {
		b.addRequestInFlightMetrics(-1)
		Logger.Printf("Failed to read SASL handshake header : %s\n", err.Error())
		if err == io.ErrUnexpectedEOF {
			return ErrSASLHandshakeReadEOF
		}
		return err
	}

	length := binary.BigEndian.Uint32(header[:4])
	payload := make([]byte, length-4)
	n, err := b.readFull(payload)
	if err != nil {
		b.addRequestInFlightMetrics(-1)
		Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
		return err
	}

	b.updateIncomingCommunicationMetrics(n+8, time.Since(requestTime))
	res := &SaslHandshakeResponse{}

	err = versionedDecode(payload, res, 0, b.metricRegistry)
	if err != nil {
		Logger.Printf("Failed to parse SASL handshake : %s\n", err.Error())
		return err
	}

	if !errors.Is(res.Err, ErrNoError) {
		Logger.Printf("Invalid SASL Mechanism : %s\n", res.Err.Error())
		return res.Err
	}

	DebugLogger.Print("Completed pre-auth SASL handshake. Available mechanisms: ", res.EnabledMechanisms)
	return nil
}