in go/mqtt/connect.go [84:157]
func (c *SessionClient) manageConnection(ctx context.Context) error {
defer c.cleanup(ctx)
var reconnect bool
for {
var connack *paho.Connack
err := c.options.ConnectionRetry.Start(ctx, "connect",
func(ctx context.Context) (bool, error) {
var err error
connCtx := ctx
if c.options.ConnectionTimeout > 0 {
var cancel func()
connCtx, cancel = context.WithTimeout(
ctx,
c.options.ConnectionTimeout,
)
defer cancel()
}
connack, err = c.connect(connCtx, reconnect)
// Decide to retry depending on whether we consider this error
// to be fatal. We don't wrap these errors, so we can use a
// simple type-switch instead of Go error wrapping.
switch err.(type) {
case *InvalidArgumentError,
*SessionLostError,
*FatalConnackError,
*FatalDisconnectError:
return false, err
default:
return true, err
}
},
)
if err != nil {
return err
}
// NOTE: signalConnection and signalDisconnection must only be called
// together in this loop to ensure ordering between the two.
c.signalConnection(ctx, &ConnectEvent{ReasonCode: connack.ReasonCode})
reconnect = true
select {
case <-c.conn.Current().Down.Done():
// Current paho instance got disconnected.
switch err := c.conn.Current().Error.(type) {
case *FatalDisconnectError:
c.signalDisconnection(ctx, &DisconnectEvent{
ReasonCode: &err.ReasonCode,
})
return err
case *DisconnectError:
c.signalDisconnection(ctx, &DisconnectEvent{
ReasonCode: &err.ReasonCode,
})
default:
c.signalDisconnection(ctx, &DisconnectEvent{
Error: err,
})
}
case <-ctx.Done():
// Session client is shutting down.
return nil
}
// If we get here, a reconnection will be attempted.
}
}