func()

in pulsar/internal/connection.go [287:337]


func (c *connection) doHandshake() bool {
	// Send 'Connect' command to initiate handshake
	authData, err := c.auth.GetData()
	if err != nil {
		c.log.WithError(err).Warn("Failed to load auth credentials")
		return false
	}

	// During the initial handshake, the internal keep alive is not
	// active yet, so we need to timeout write and read requests
	c.cnx.SetDeadline(time.Now().Add(c.keepAliveInterval))
	cmdConnect := &pb.CommandConnect{
		ProtocolVersion: proto.Int32(PulsarProtocolVersion),
		ClientVersion:   proto.String(ClientVersionString),
		AuthMethodName:  proto.String(c.auth.Name()),
		AuthData:        authData,
		FeatureFlags: &pb.FeatureFlags{
			SupportsAuthRefresh:         proto.Bool(true),
			SupportsBrokerEntryMetadata: proto.Bool(true),
		},
	}

	if c.logicalAddr.Host != c.physicalAddr.Host {
		cmdConnect.ProxyToBrokerUrl = proto.String(c.logicalAddr.Host)
	}
	c.writeCommand(baseCommand(pb.BaseCommand_CONNECT, cmdConnect))
	cmd, _, err := c.reader.readSingleCommand()
	if err != nil {
		c.log.WithError(err).Warn("Failed to perform initial handshake")
		return false
	}

	// Reset the deadline so that we don't use read timeouts
	c.cnx.SetDeadline(time.Time{})

	if cmd.Connected == nil {
		c.log.Warnf("Failed to establish connection with broker: '%s'",
			cmd.Error.GetMessage())
		return false
	}
	if cmd.Connected.MaxMessageSize != nil && *cmd.Connected.MaxMessageSize > 0 {
		c.log.Debug("Got MaxMessageSize from handshake response:", *cmd.Connected.MaxMessageSize)
		c.maxMessageSize = *cmd.Connected.MaxMessageSize
	} else {
		c.log.Debug("No MaxMessageSize from handshake response, use default: ", MaxMessageSize)
		c.maxMessageSize = MaxMessageSize
	}
	c.log.Info("Connection is ready")
	c.changeState(connectionReady)
	return true
}