in go/mqtt/connect.go [163:248]
func (c *SessionClient) connect(
ctx context.Context,
reconnect bool,
) (*paho.Connack, error) {
attempt := c.conn.Attempt()
conn, err := c.connectionProvider(ctx)
if err != nil {
return nil, err
}
var auther paho.Auther
if c.options.Auth != nil {
auther = &pahoAuther{c}
}
pahoClient := paho.NewClient(paho.ClientConfig{
ClientID: c.clientID,
Session: c.session,
Conn: conn,
AuthHandler: auther,
// Set Paho's packet timeout to the maximum possible value to
// effectively disable it. We can still control any timeouts through the
// contexts we pass into Paho.
PacketTimeout: math.MaxInt64,
// Disable automatic acking in Paho. The session client will manage acks
// instead.
EnableManualAcknowledgment: true,
OnPublishReceived: []func(paho.PublishReceived) (bool, error){
// Add 1 to the conn count for this because this listener is
// effective AFTER the connection succeeds.
c.makeOnPublishReceived(ctx, attempt),
},
OnServerDisconnect: func(d *paho.Disconnect) {
if isFatalDisconnectReasonCode(d.ReasonCode) {
c.conn.Disconnect(attempt, &FatalDisconnectError{d.ReasonCode})
} else {
c.conn.Disconnect(attempt, &DisconnectError{d.ReasonCode})
}
},
OnClientError: func(err error) {
c.conn.Disconnect(attempt, err)
},
})
connect, err := c.buildConnectPacket(ctx, reconnect)
if err != nil {
return nil, err
}
c.log.Packet(ctx, "connect", connect)
connack, err := pahoClient.Connect(ctx, connect)
c.log.Packet(ctx, "connack", connack)
switch {
case connack == nil:
// This assumes that all errors returned by Paho's connect method
// without a CONNACK are non-fatal.
return nil, err
case isFatalConnackReasonCode(connack.ReasonCode):
return nil, &FatalConnackError{connack.ReasonCode}
case connack.ReasonCode >= 80:
return nil, &ConnackError{connack.ReasonCode}
case reconnect && !connack.SessionPresent:
c.forceDisconnect(ctx, pahoClient)
return nil, &SessionLostError{}
default:
if err := c.conn.Connect(pahoClient); err != nil {
return nil, err
}
if c.options.Auth != nil && connack.Properties.AuthMethod == "" {
// Ensure the auth provider is notified of success even if the MQTT
// server fails to echo the auth method.
c.options.Auth.AuthSuccess(c.requestReauth)
}
return connack, nil
}
}