in pulsar/internal/connection.go [287:337]
func (c *connection) doHandshake() bool {
// Send 'Connect' command to initiate handshake
authData, err := c.auth.GetData()
if err != nil {
c.log.WithError(err).Warn("Failed to load auth credentials")
return false
}
// During the initial handshake, the internal keep alive is not
// active yet, so we need to timeout write and read requests
c.cnx.SetDeadline(time.Now().Add(c.keepAliveInterval))
cmdConnect := &pb.CommandConnect{
ProtocolVersion: proto.Int32(PulsarProtocolVersion),
ClientVersion: proto.String(ClientVersionString),
AuthMethodName: proto.String(c.auth.Name()),
AuthData: authData,
FeatureFlags: &pb.FeatureFlags{
SupportsAuthRefresh: proto.Bool(true),
SupportsBrokerEntryMetadata: proto.Bool(true),
},
}
if c.logicalAddr.Host != c.physicalAddr.Host {
cmdConnect.ProxyToBrokerUrl = proto.String(c.logicalAddr.Host)
}
c.writeCommand(baseCommand(pb.BaseCommand_CONNECT, cmdConnect))
cmd, _, err := c.reader.readSingleCommand()
if err != nil {
c.log.WithError(err).Warn("Failed to perform initial handshake")
return false
}
// Reset the deadline so that we don't use read timeouts
c.cnx.SetDeadline(time.Time{})
if cmd.Connected == nil {
c.log.Warnf("Failed to establish connection with broker: '%s'",
cmd.Error.GetMessage())
return false
}
if cmd.Connected.MaxMessageSize != nil && *cmd.Connected.MaxMessageSize > 0 {
c.log.Debug("Got MaxMessageSize from handshake response:", *cmd.Connected.MaxMessageSize)
c.maxMessageSize = *cmd.Connected.MaxMessageSize
} else {
c.log.Debug("No MaxMessageSize from handshake response, use default: ", MaxMessageSize)
c.maxMessageSize = MaxMessageSize
}
c.log.Info("Connection is ready")
c.changeState(connectionReady)
return true
}