go/mqtt/session_client.go (81 lines of code) (raw):

// Copyright (c) Microsoft Corporation. // Licensed under the MIT License. package mqtt import ( "math" "sync/atomic" "github.com/Azure/iot-operations-sdks/go/internal/log" "github.com/Azure/iot-operations-sdks/go/mqtt/internal" "github.com/Azure/iot-operations-sdks/go/mqtt/retry" "github.com/eclipse/paho.golang/paho" "github.com/eclipse/paho.golang/paho/session" "github.com/eclipse/paho.golang/paho/session/state" ) type ( // SessionClient implements an MQTT session client supporting MQTT v5 with // QoS 0 and QoS 1 support. SessionClient struct { // Used to ensure Start() is called only once and that user operations // are only started after Start() is called. sessionStarted atomic.Bool // Used to signal client shutdown for cleaning up background goroutines // and inflight operations. Only valid once started. shutdown *internal.Background // Tracker for the connection. Only valid once started. conn *internal.ConnectionTracker[*paho.Client] // A list of functions that listen for incoming messages. messageHandlers *internal.AppendableListWithRemoval[messageHandler] // A list of functions that are called in order to notify the user of // successful MQTT connections. connectEventHandlers *internal.AppendableListWithRemoval[ConnectEventHandler] // A list of functions that are called in order to notify the user of a // disconnection from the MQTT server. disconnectEventHandlers *internal.AppendableListWithRemoval[DisconnectEventHandler] // A list of functions that are called in goroutines to notify the user // of a session client termination due to a fatal error. fatalErrorHandlers *internal.AppendableListWithRemoval[func(error)] // Buffered channel containing the PUBLISH packets to be sent. outgoingPublishes chan *outgoingPublish // Paho's internal MQTT session tracker. session session.SessionManager clientID string connectionProvider ConnectionProvider options SessionClientOptions log internal.Logger } ) // NewSessionClient constructs a new session client with user options. func NewSessionClient( clientID string, connectionProvider ConnectionProvider, opts ...SessionClientOption, ) (*SessionClient, error) { if clientID == "" { return nil, &InvalidArgumentError{ message: "client ID must be configured", } } if connectionProvider == nil { return nil, &InvalidArgumentError{ message: "connection must be configured", } } // Default client options. client := &SessionClient{ clientID: clientID, connectionProvider: connectionProvider, conn: internal.NewConnectionTracker[*paho.Client](), messageHandlers: internal.NewAppendableListWithRemoval[messageHandler](), connectEventHandlers: internal.NewAppendableListWithRemoval[ConnectEventHandler](), disconnectEventHandlers: internal.NewAppendableListWithRemoval[DisconnectEventHandler](), fatalErrorHandlers: internal.NewAppendableListWithRemoval[func(error)](), outgoingPublishes: make(chan *outgoingPublish, maxPublishQueueSize), session: state.NewInMemory(), } client.options.Apply(opts) if client.options.KeepAlive == 0 { client.options.KeepAlive = 60 } if client.options.SessionExpiry == 0 { client.options.SessionExpiry = math.MaxUint32 } if client.options.ReceiveMaximum == 0 { client.options.ReceiveMaximum = math.MaxUint16 } if client.options.ConnectionRetry == nil { client.options.ConnectionRetry = &retry.ExponentialBackoff{ Logger: client.options.Logger, } } if !client.options.DisableAIOBrokerFeatures { if client.options.ConnectUserProperties == nil { client.options.ConnectUserProperties = make(map[string]string, 1) } client.options.ConnectUserProperties["metriccategory"] = "aiosdk-go" } client.log.Logger = log.Wrap(client.options.Logger) return client, nil } // ID returns the MQTT client ID for this session client. func (c *SessionClient) ID() string { return c.clientID }