constructor()

in common/transport/mqtt/src/mqtt_base.ts [152:326]


  constructor(mqttProvider?: any) {
    super();
    this.mqttProvider = mqttProvider ? mqttProvider : require('mqtt');
    this._onTheWirePublishes = new OnTheWireMessageContainer();

    this._fsm = new machina.Fsm({
      namespace: 'mqtt-base',
      initialState: 'disconnected',
      states: {
        disconnected: {
          _onEnter: (callback, err) => {
            debug('In MQTT base FSM - entered onEnter for disconnect');
            //
            // The semantics of this _onEnter for the disconnected state (which is the initial state)
            // is that we got here from another one of the states of this FSM.
            //
            // So there was a disconnection.
            //
            // If there are any outstanding publishes, fail them.  We will never see
            // their acknowledgements (PUBACK).  It is important to acknowledge that
            // the publishes that were "on the wire", might indeed make it to the peer.  We'll
            // never know.  If the code further up the stack retries, we could indeed get
            // duplication of published data.  Nothing we can really do about it.
            //
            this._onTheWirePublishes.purge(err);
            //
            // One of the other states was able to pass along a callback.  Use it to finish up whatever
            // operation the state machine was working on.
            //
            // If there is no callback present, the clear implication is that something pretty major occurred,
            // NOT in the context of any particular operation.  There is NO operation that this error can be reported
            // as a result for.  Hence we emit the 'error' event.
            //
            if (callback) {
              callback(err);
            } else {
              if (err) {
                debugErrors('In mqtt base - no callback for error - emitting \'error\': ' + this._errorDescription(err));
                this.emit('error', err);
              }
            }
          },
          connect: (callback) => this._fsm.transition('connecting', callback),
          disconnect: (callback) => callback(),
          /*Codes_SRS_NODE_COMMON_MQTT_BASE_16_020: [The `publish` method shall call the callback with a `NotConnectedError` if the connection hasn't been established prior to calling `publish`.]*/
          publish: (_topic, _payload, _options, callback) => callback(new errors.NotConnectedError()),
          /*Codes_SRS_NODE_COMMON_MQTT_BASE_16_026: [The `subscribe` method shall call the callback with a `NotConnectedError` if the connection hasn't been established prior to calling `publish`.]*/
          subscribe: (_topic, _options, callback) => callback(new errors.NotConnectedError()),
          /*Codes_SRS_NODE_COMMON_MQTT_BASE_16_027: [The `unsubscribe` method shall call the callback with a `NotConnectedError` if the connection hasn't been established prior to calling `publish`.]*/
          unsubscribe: (_topic, callback) => callback(new errors.NotConnectedError()),
          updateSharedAccessSignature: (callback) => {
            /*Codes_SRS_NODE_COMMON_MQTT_BASE_16_034: [The `updateSharedAccessSignature` method shall not trigger any network activity if the mqtt client is not connected.]*/
            debug('updating shared access signature while disconnected');
            callback();
          }
        },
        connecting: {
          _onEnter: (connectCallback) => {
            this._connectClient((err, connack) => {
              if (err) {
                this._fsm.transition('disconnecting', connectCallback, err);
              } else {
                this._mqttTrackedListeners.addTrackedListener('error', this._errorCallback.bind(this));
                this._fsm.transition('connected', connectCallback, connack);
              }
            });
          },
          disconnect: (callback) => {
            this._fsm.transition('disconnecting', callback);
          },
          '*': () => this._fsm.deferUntilTransition()
        },
        connected: {
          _onEnter: (connectCallback, connack) => {
            this._mqttTrackedListeners.addTrackedListener('close', this._closeCallback.bind(this));
            connectCallback(null, new results.Connected(connack));
          },
          connect: (callback) => callback(null, new results.Connected()),
          disconnect: (callback) => this._fsm.transition('disconnecting', callback),
          publish: (topic, payload, options, callback) => {
            const thisPublishIdentifier = this._onTheWirePublishes.provideIdentifier();
            this._onTheWirePublishes.add(thisPublishIdentifier, new OnTheWireMessage(callback));
            /*Codes_SRS_NODE_COMMON_MQTT_BASE_16_017: [The `publish` method publishes a `payload` on a `topic` using `options`.]*/
            /*Codes_SRS_NODE_COMMON_MQTT_BASE_16_021: [The  `publish` method shall call `publish` on the mqtt client object and call the `callback` argument with `null` and the `puback` object if it succeeds.]*/
            /*Codes_SRS_NODE_COMMON_MQTT_BASE_16_022: [The `publish` method shall call the `callback` argument with an Error if the operation fails.]*/
            this._mqttClient.publish(topic, payload, options, (err, result) => {
              this._onTheWirePublishes.complete(thisPublishIdentifier, err, result);
            });
          },
          subscribe: (topic, options, callback) => {
            /*Codes_SRS_NODE_COMMON_MQTT_BASE_12_008: [The `subscribe` method shall call `subscribe`  on MQTT.JS  library and pass it the `topic` and `options` arguments.]*/
            /*Codes_SRS_NODE_COMMON_MQTT_BASE_16_024: [The `subscribe` method shall call the callback with `null` and the `suback` object if the mqtt library successfully subscribes to the `topic`.]*/
            /*Codes_SRS_NODE_COMMON_MQTT_BASE_16_025: [The `subscribe` method shall call the callback with an `Error` if the mqtt library fails to subscribe to the `topic`.]*/
            this._mqttClient.subscribe(topic, options, callback);
          },
          unsubscribe: (topic, callback) => {
            /*Codes_SRS_NODE_COMMON_MQTT_BASE_16_028: [The `unsubscribe` method shall call `unsubscribe` on the mqtt library and pass it the `topic`.]*/
            /*Codes_SRS_NODE_COMMON_MQTT_BASE_16_029: [The `unsubscribe` method shall call the `callback` argument with no arguments if the operation succeeds.]*/
            /*Codes_SRS_NODE_COMMON_MQTT_BASE_16_030: [The `unsubscribe` method shall call the `callback` argument with an `Error` if the operation fails.]*/
            this._mqttClient.unsubscribe(topic, callback);
          },
          updateSharedAccessSignature: (callback) => {
            this._fsm.transition('reconnecting', callback);
          },
          closeEvent: () => {
            this._fsm.transition('disconnected', undefined, new errors.NotConnectedError('Connection to the server has been closed.'));
          }
        },
        disconnecting: {
          _onEnter: (disconnectCallback, err) => {
            this._disconnectClient(!!err, () => {
              this._fsm.transition('disconnected', disconnectCallback, err);
            });
          },
          '*': () => this._fsm.deferUntilTransition()
        },
        reconnecting: {
          _onEnter: (callback) => {
            /*Codes_SRS_NODE_COMMON_MQTT_BASE_16_033: [The `updateSharedAccessSignature` method shall disconnect and reconnect the mqtt client with the new `sharedAccessSignature`.]*/
            /*Codes_SRS_NODE_COMMON_MQTT_BASE_16_035: [The `updateSharedAccessSignature` method shall call the `callback` argument with no parameters if the operation succeeds.]*/
            /*Codes_SRS_NODE_COMMON_MQTT_BASE_16_036: [The `updateSharedAccessSignature` method shall call the `callback` argument with an `Error` if the operation fails.]*/
            let switched = false;
            /*Codes_SRS_NODE_COMMON_MQTT_BASE_41_002: [The `updateSharedAccessSignature` method shall trigger a forced disconnect if after 30 seconds the mqtt client has failed to complete a non-forced disconnect.]*/
            /*Codes_SRS_NODE_COMMON_MQTT_BASE_41_003: [The `updateSharedAccessSignature` method shall call the `callback` argument with an `Error` if the operation fails after timing out.]*/
            const disconnectTimeout = setTimeout(() => {
              debugErrors('disconnecting mqtt client timed out. Force disconnecting.');
              switched = true;
              this._fsm.handle('forceDisconnect', callback);
            }, 30000);

            // When experiencing message loss due to high throughput, the force flag in
            // disconnectClient can be set to true to cause messages to be properly dropped,
            // and thus re-sent. Making this the default is under investigation.
            debug('disconnecting mqtt client');
            this._disconnectClient(false, () => {
              clearTimeout(disconnectTimeout);
              if (!switched) {
                debug('mqtt client disconnected - reconnecting');
                this._connectClient((err, connack) => {
                  if (err) {
                    debugErrors('failed to reconnect the client: ' + err.toString());
                    this._fsm.transition('disconnected', callback, err);
                  } else {
                    debug('mqtt client reconnected successfully');
                    this._mqttTrackedListeners.addTrackedListener('error', this._errorCallback.bind(this));
                    this._fsm.transition('connected', callback, connack);
                  }
                });
              }
            });
          },
          forceDisconnect: (callback) => {
            debug('force disconnecting mqtt client');
            this._disconnectClient(true, () => {
              debug('mqtt client disconnected - reconnecting');
              this._connectClient((err, connack) => {
                if (err) {
                  debugErrors('failed to reconnect the client: ' + err);
                  this._fsm.transition('disconnected', callback, err);
                } else {
                  debug('mqtt client reconnected successfully');
                  this._fsm.transition('connected', callback, connack);
                }
              });
            });
          },
          '*': () => this._fsm.deferUntilTransition()
        }
      }
    });

    this._fsm.on('transition', (data) => {
      debug(data.fromState + ' -> ' + data.toState + ' (' + data.action + ')');
    });
  }