constructor()

in device/transport/mqtt/src/mqtt.ts [54:412]


  constructor(authenticationProvider: AuthenticationProvider, mqttBase?: any) {
    super();
    this._firstConnection = true;
    this._authenticationProvider = authenticationProvider;
    /*Codes_SRS_NODE_DEVICE_MQTT_16_071: [The constructor shall subscribe to the `newTokenAvailable` event of the `authenticationProvider` passed as an argument if it uses tokens for authentication.]*/
    if (this._authenticationProvider.type === AuthenticationType.Token) {
      (<any>this._authenticationProvider).on('newTokenAvailable', (newCredentials) => {
        /*Codes_SRS_NODE_DEVICE_MQTT_16_072: [If the `newTokenAvailable` event is fired, the `Mqtt` object shall do nothing if it isn't connected.]*/
        /*Codes_SRS_NODE_DEVICE_MQTT_16_073: [If the `newTokenAvailable` event is fired, the `Mqtt` object shall call `updateSharedAccessSignature` on the `mqttBase` object if it is connected.]*/
        this._fsm.handle('updateSharedAccessSignature', newCredentials.sharedAccessSignature, (err) => {
          /*Codes_SRS_NODE_DEVICE_MQTT_16_074: [If updating the shared access signature fails when the `newTokenAvailable` event is fired, the `Mqtt` state machine shall fire a `disconnect` event.]*/
          if (err) {
            this.emit('disconnect', err);
          }
        });
      });
    }

    /* Codes_SRS_NODE_DEVICE_MQTT_18_025: [ If the Mqtt constructor receives a second parameter, it shall be used as a mqttBase in place of mqtt.js ]*/
    if (mqttBase) {
      this._mqtt = mqttBase;
    } else {
      this._mqtt = new MqttBase();
    }

    /* Codes_SRS_NODE_DEVICE_MQTT_18_026: When MqttTransport fires the close event, the Mqtt object shall emit a disconnect event */
    this._mqtt.on('error', (err) => {
      debug('on close');
      this._fsm.handle('disconnect', () => {
        this.emit('disconnect', err);
      });
    });

    this._mqtt.on('message', this._dispatchMqttMessage.bind(this));

    this._twinClient = new MqttTwinClient(this._mqtt);

    /*Codes_SRS_NODE_DEVICE_MQTT_16_081: [The `Mqtt` constructor shall subscribe to the `MqttTwinClient` `twinDesiredPropertiesUpdates`.]*/
    /*Codes_SRS_NODE_DEVICE_MQTT_16_082: [A `twinDesiredPropertiesUpdates` shall be emitted by the `Mqtt` object for each `twinDesiredPropertiesUpdates` event received from the `MqttTwinClient` with the same payload. **/
    this._twinClient.on('twinDesiredPropertiesUpdate', (patch) => this.emit('twinDesiredPropertiesUpdate', patch));

    this._fsm = new machina.Fsm({
      initialState: 'disconnected',
      states: {
        disconnected: {
          _onEnter: (disconnectedCallback, err, result) => {
            /*Codes_SRS_NODE_DEVICE_MQTT_16_085: [Once the MQTT transport is disconnected and if it is using a token authentication provider, the `stop` method of the `AuthenticationProvider` object shall be called to stop any running timer.]*/
            if (this._authenticationProvider.type === AuthenticationType.Token) {
              (this._authenticationProvider as SharedAccessKeyAuthenticationProvider).stop();
            }

            if (disconnectedCallback) {
              if (err) {
                /*Codes_SRS_NODE_DEVICE_MQTT_16_019: [The `connect` method shall calls its callback with an `Error` that has been translated from the `MqttBase` error using the `translateError` method if it fails to establish a connection.]*/
                disconnectedCallback(translateError(err));
              } else {
                disconnectedCallback(undefined, result);
              }
            } else {
              /* Codes_SRS_NODE_DEVICE_MQTT_18_026: When MqttTransport fires the close event, the Mqtt object shall emit a disconnect event */
              this.emit('disconnect', err);
            }
          },
          /*Codes_SRS_NODE_DEVICE_MQTT_16_021: [The `disconnect` method shall call its callback immediately with a `null` argument and a `results.Disconnected` second argument if `MqttBase` is already disconnected.]*/
          disconnect: (callback) => callback(null, new results.Disconnected()),
          connect: (callback) => {
            this._fsm.transition('connecting', callback);
          },
          sendEvent: (message, outputProps, sendEventCallback) => {
            /*Codes_SRS_NODE_DEVICE_MQTT_16_023: [The `sendEvent` method shall connect the Mqtt connection if it is disconnected.]*/
            /*Codes_SRS_NODE_DEVICE_MQTT_18_045: [The `sendOutputEvent` method shall connect the Mqtt connection if it is disconnected. ]*/
            this._fsm.handle('connect', (err) => {
              if (err) {
                /*Codes_SRS_NODE_DEVICE_MQTT_16_024: [The `sendEvent` method shall call its callback with an `Error` that has been translated using the `translateError` method if the `MqttBase` object fails to establish a connection.]*/
                /*Codes_SRS_NODE_DEVICE_MQTT_18_046: [The `sendOutputEvent` method shall call its callback with an `Error` that has been translated using the `translateError` method if the `MqttBase` object fails to establish a connection. ]*/
                sendEventCallback(translateError(err));
              } else {
                this._fsm.handle('sendEvent', message, outputProps, sendEventCallback);
              }
            });
          },
          updateSharedAccessSignature: (_sharedAccessSignature, callback) => { callback(null, new results.SharedAccessSignatureUpdated(false)); },
          sendMethodResponse: (_response, callback) => {
            /*Codes_SRS_NODE_DEVICE_MQTT_16_034: [The `sendMethodResponse` method shall fail with a `NotConnectedError` if the `MqttBase` object is not connected.]*/
            callback(new errors.NotConnectedError('device disconnected: the service already considers the method has failed'));
          },
          getTwin: (callback) => {
            /*Codes_SRS_NODE_DEVICE_MQTT_16_075: [`getTwin` shall establish the MQTT connection by calling `connect` on the `MqttBase` object if it is disconnected.]*/
            this._fsm.handle('connect', (err) => {
              if (err) {
                /*Codes_SRS_NODE_DEVICE_MQTT_16_076: [`getTwin` shall call its callback with an error if it fails to connect the transport]*/
                callback(err);
              } else {
                this._fsm.handle('getTwin', callback);
              }
            });
          },
          updateTwinReportedProperties: (patch, callback) => {
            /*Codes_SRS_NODE_DEVICE_MQTT_16_078: [`updateTwinReportedProperties` shall establish the MQTT connection by calling `connect` on the `MqttBase` object if it is disconnected.]*/
            this._fsm.handle('connect', (err) => {
              if (err) {
                /*Codes_SRS_NODE_DEVICE_MQTT_16_079: [`updateTwinReportedProperties` shall call its callback with an error if it fails to connect the transport]*/
                callback(err);
              } else {
                this._fsm.handle('updateTwinReportedProperties', patch, callback);
              }
            });
          },
          enableC2D: (callback) => {
            /*Codes_SRS_NODE_DEVICE_MQTT_16_047: [`enableC2D` shall connect the MQTT connection if it is disconnected.]*/
            this._fsm.handle('connect', (err) => {
              if (err) {
                /*Codes_SRS_NODE_DEVICE_MQTT_16_048: [`enableC2D` shall calls its callback with an `Error` object if it fails to connect.]*/
                callback(err);
              } else {
                this._fsm.handle('enableC2D', callback);
              }
            });
          },
          enableMethods: (callback) => {
            /*Codes_SRS_NODE_DEVICE_MQTT_16_038: [`enableMethods` shall connect the MQTT connection if it is disconnected.]*/
            this._fsm.handle('connect', (err) => {
              if (err) {
                /*Codes_SRS_NODE_DEVICE_MQTT_16_039: [`enableMethods` shall calls its callback with an `Error` object if it fails to connect.]*/
                callback(err);
              } else {
                this._fsm.handle('enableMethods', callback);
              }
            });
          },
          enableInputMessages: (callback) => {
            /*Codes_SRS_NODE_DEVICE_MQTT_18_059: [ `enableInputMessages` shall connect the MQTT connection if it is disconnected. ]*/
            this._fsm.handle('connect', (err) => {
              if (err) {
                /*Codes_SRS_NODE_DEVICE_MQTT_18_060: [ `enableInputMessages` shall calls its callback with an `Error` object if it fails to connect. ]*/
                callback(err);
              } else {
                this._fsm.handle('enableInputMessages', callback);
              }
            });
          },
          disableC2D: (callback) => {
            /*Codes_SRS_NODE_DEVICE_MQTT_16_041: [`disableC2D` shall call its callback immediately if the MQTT connection is already disconnected.]*/
            callback();
          },
          disableMethods: (callback) => {
            /*Codes_SRS_NODE_DEVICE_MQTT_16_044: [`disableMethods` shall call its callback immediately if the MQTT connection is already disconnected.]*/
            callback();
          },
          enableTwinDesiredPropertiesUpdates: (callback) => {
            /*Codes_SRS_NODE_DEVICE_MQTT_16_057: [`enableTwinDesiredPropertiesUpdates` shall connect the MQTT connection if it is disconnected.]*/
            this._fsm.handle('connect', (err) => {
              if (err) {
                /*Codes_SRS_NODE_DEVICE_MQTT_16_058: [`enableTwinDesiredPropertiesUpdates` shall calls its callback with an `Error` object if it fails to connect.]*/
                callback(err);
              } else {
                this._fsm.handle('enableTwinDesiredPropertiesUpdates', callback);
              }
            });
          },
          disableTwinDesiredPropertiesUpdates: (callback) => callback(),
          disableInputMessages: (callback) => {
            /*Codes_SRS_NODE_DEVICE_MQTT_18_064: [ `disableInputMessages` shall call its callback immediately if the MQTT connection is already disconnected. ]*/
            callback();
          },
        },
        connecting: {
          _onEnter: (connectCallback) => {
            /*Codes_SRS_NODE_DEVICE_MQTT_16_067: [The `connect` method shall call the `getDeviceCredentials` method of the `AuthenticationProvider` object passed to the constructor to obtain the credentials of the device.]*/
            this._authenticationProvider.getDeviceCredentials((err, credentials) => {
              if (err) {
                /*Codes_SRS_NODE_DEVICE_MQTT_16_068: [The `connect` method shall call its callback with the error returned by `getDeviceCredentials` if it fails to return the device credentials.]*/
                this._fsm.transition('disconnected', connectCallback, err);
              } else {
                this._configureEndpoints(credentials);
                this._ensureAgentString(() => {
                  const baseConfig = this._getBaseTransportConfig(credentials);
                  this._mqtt.connect(baseConfig, (err, result) => {
                    debug('connect');
                    if (err) {
                      debugErrors('Connect error: ' + err);
                      if (this._firstConnection) {
                        /* Codes_SRS_NODE_DEVICE_MQTT_41_006: [The `connect` method shall call its callback with an `UnauthorizedError` returned by the primary call to `connect` in the base MQTT client.]*/
                        this._fsm.transition('disconnected', connectCallback, new Error('Failure on first connection (Not authorized): ' + err.message));
                      } else {
                        /* Codes_SRS_NODE_DEVICE_MQTT_41_007: [The `connect` method shall call its callback with the error returned by the non-primary call to `connect` in the base MQTT client.]*/
                        this._fsm.transition('disconnected', connectCallback, err);
                      }
                    } else {
                      this._firstConnection = false;
                      this._fsm.transition('connected', connectCallback, result);
                    }
                  });
                });
              }
            });
          },
          disconnect: (disconnectCallback) => {
            this._fsm.transition('disconnecting', disconnectCallback);
          },

          /*Codes_SRS_NODE_DEVICE_MQTT_16_025: [If `sendEvent` is called while `MqttBase` is establishing the connection, it shall wait until the connection is established and then send the event.]*/
          /*Codes_SRS_NODE_DEVICE_MQTT_16_035: [If `sendEvent` is called while `MqttBase` is establishing the connection, and `MqttBase` fails to establish the connection, then sendEvent shall fail.]*/
          /*Codes_SRS_NODE_DEVICE_MQTT_18_047: [If `sendOutputEvent` is called while `MqttBase` is establishing the connection, it shall wait until the connection is established and then send the event. ]*/
          /*Codes_SRS_NODE_DEVICE_MQTT_18_048: [If `sendOutputEvent` is called while `MqttBase` is establishing the connection, and `MqttBase` fails to establish the connection, then sendEvent shall fail. ]*/
          '*': () => this._fsm.deferUntilTransition()
        },
        connected: {
          _onEnter: (connectedCallback, connectResult) => {
            /*Codes_SRS_NODE_DEVICE_MQTT_41_016: [ The `connect` method shall emit `connected` once the transport is connected ]*/
            this.emit('connected');
            /*Codes_SRS_NODE_DEVICE_MQTT_16_020: [The `connect` method shall call its callback with a `null` error parameter and a `results.Connected` response if `MqttBase` successfully connects.]*/
            if (connectedCallback) connectedCallback(null, new results.Connected(connectResult));
          },
          /*Codes_SRS_NODE_DEVICE_MQTT_16_018: [The `connect` method shall call its callback immediately if `MqttBase` is already connected.]*/
          connect: (connectCallback) => connectCallback(null, new results.Connected()),
          disconnect: (disconnectCallback) => {
            this._fsm.transition('disconnecting', disconnectCallback);
          },
          sendEvent: (message, outputProps, sendEventCallback) => {
            /*Codes_SRS_NODE_COMMON_MQTT_BASE_16_008: [The `sendEvent` method shall use a topic formatted using the following convention: `devices/<deviceId>/messages/events/`.]*/
            let topic = this._getEventTopicFromMessage(message, outputProps);
            if (outputProps) {
              topic += '/';
            }

            // This will not catch all messages that exceed IoT Hub limits because properties contribute the size as well.
            if ((message?.data?.length ?? 0) > 256 * 1024) {
              sendEventCallback(new errors.MessageTooLargeError('Message size is greater than 256KiB'));
              return;
            }

            /*Codes_SRS_NODE_COMMON_MQTT_BASE_16_010: [** The `sendEvent` method shall use QoS level of 1.]*/
            this._mqtt.publish(topic, message.data, { qos: 1, retain: false }, (err, result) => {
              if (err) {
                /*Codes_SRS_NODE_DEVICE_MQTT_16_027: [The `sendEvent` method shall call its callback with an `Error` that has been translated using the `translateError` method if the `MqttBase` object fails to publish the message.]*/
                /*Codes_SRS_NODE_DEVICE_MQTT_18_050: [The `sendOutputEvent` method shall call its callback with an `Error` that has been translated using the `translateError` method if the `MqttBase` object fails to publish the message. ]*/
                sendEventCallback(translateError(err));
              } else {
                /*Codes_SRS_NODE_DEVICE_MQTT_41_004 [ The `sendEvent` method shall call its callback with a `MessageEnqueued` ]*/
                /*Codes_SRS_NODE_DEVICE_MQTT_41_005 [ The `sendOutputEvent` method shall call its callback with a `MessageEnqueued` ]*/
                sendEventCallback(null, new results.MessageEnqueued(result));
              }
            });
          },
          updateSharedAccessSignature: (sharedAccessSignature, callback) => {
            /*Codes_SRS_NODE_DEVICE_MQTT_16_028: [The `updateSharedAccessSignature` method shall call the `updateSharedAccessSignature` method on the `MqttBase` object if it is connected.]*/
            this._mqtt.updateSharedAccessSignature(sharedAccessSignature, (err) => {
              if (err) {
                /*Codes_SRS_NODE_DEVICE_MQTT_16_009: [The `updateSharedAccessSignature` method shall call the `done` method with an `Error` object if `MqttBase.updateSharedAccessSignature` fails.]*/
                this._fsm.transition('disconnected', callback, err);
              } else {
                /*Codes_SRS_NODE_DEVICE_MQTT_16_010: [The `updateSharedAccessSignature` method shall call the `done` callback with a `null` error object and a `SharedAccessSignatureUpdated` object with its `needToReconnect` property set to `false`, if `MqttBase.updateSharedAccessSignature` succeeds.]*/
                callback(null, new results.SharedAccessSignatureUpdated(false));
              }
            });
          },
          sendMethodResponse: (response, callback) => {
            // Codes_SRS_NODE_DEVICE_MQTT_13_002: [ sendMethodResponse shall build an MQTT topic name in the format: $iothub/methods/res/<STATUS>/?$rid=<REQUEST ID>&<PROPERTIES> where <STATUS> is response.status. ]
            // Codes_SRS_NODE_DEVICE_MQTT_13_003: [ sendMethodResponse shall build an MQTT topic name in the format: $iothub/methods/res/<STATUS>/?$rid=<REQUEST ID>&<PROPERTIES> where <REQUEST ID> is response.requestId. ]
            // Codes_SRS_NODE_DEVICE_MQTT_13_004: [ sendMethodResponse shall build an MQTT topic name in the format: $iothub/methods/res/<STATUS>/?$rid=<REQUEST ID>&<PROPERTIES> where <PROPERTIES> is URL encoded. ]
            const topicName = util.format(
              TOPIC_RESPONSE_PUBLISH_FORMAT,
              'methods',
              response.status,
              response.requestId
            );

            debug('sending response using topic: ' + topicName);
            debug(JSON.stringify(response.payload));
            // publish the response message
            this._mqtt.publish(topicName, JSON.stringify(response.payload), { qos: 0, retain: false }, (err) => {
              // Codes_SRS_NODE_DEVICE_MQTT_13_006: [ If the MQTT publish fails then an error shall be returned via the done callback's first parameter. ]
              // Codes_SRS_NODE_DEVICE_MQTT_13_007: [ If the MQTT publish is successful then the done callback shall be invoked passing null for the first parameter. ]
              callback(err ? translateError(err) : null);
            });
          },
          /* Codes_SRS_NODE_DEVICE_MQTT_41_008: [`enableC2D` shall not subscribe multiple times if already subscribed.]*/
          enableC2D: (callback) => {
            if (this._topics.message && this._topics.message.subscribed) {
              debug('already subscribed to `message`, doing nothing...');
              callback();
            } else {
              this._setupSubscription(this._topics.message, 1, callback);
            }
          },
          /* Codes_SRS_NODE_DEVICE_MQTT_41_009: [`enableMethods` shall not subscribe multiple times if already subscribed.]*/
          enableMethods: (callback) => {
            if (this._topics.method && this._topics.method.subscribed) {
              debug('already subscribed to `method`, doing nothing...');
              callback();
            } else {
              this._setupSubscription(this._topics.method, 0, callback);
            }
          },
          /* Codes_SRS_NODE_DEVICE_MQTT_41_010: [`enableInputMessages` shall not subscribe multiple times if already subscribed.]*/
          enableInputMessages: (callback) => {
            if (this._topics.inputMessage && this._topics.inputMessage.subscribed) {
              debug('already subscribed to `inputMessages`, doing nothing...');
              callback();
            } else {
              this._setupSubscription(this._topics.inputMessage, 1, callback);
            }
          },
          /* Codes_SRS_NODE_DEVICE_MQTT_41_011: [`disableC2D` shall unsubscribe from the topic for C2D messages only if it is currently subscribed.]*/
          disableC2D: (callback) => {
            if (this._topics.message && this._topics.message.subscribed) {
              this._removeSubscription(this._topics.message, callback);
            } else {
              debug('not subscribed to `message`, so doing nothing...');
              callback();
            }
          },
          /* Codes_SRS_NODE_DEVICE_MQTT_41_012: [`disableMethods` shall unsubscribe from the topic for direct methods only if it is currently subscribed.]*/
          disableMethods: (callback) => {
            if (this._topics.method && this._topics.method.subscribed) {
              this._removeSubscription(this._topics.method, callback);
            } else {
              debug('not subscribed to `method`, so doing nothing...');
              callback();
            }
          },
          /* Codes_SRS_NODE_DEVICE_MQTT_41_013: [`disableInputMessages` shall unsubscribe from the topic for inputMessages only if it is currently subscribed.]*/
          disableInputMessages: (callback) => {
            if (this._topics.inputMessage && this._topics.inputMessage.subscribed) {
              this._removeSubscription(this._topics.inputMessage, callback);
            } else {
              debug('not subscribed to `method`, so doing nothing...');
              callback();
            }
          },
          /*Codes_SRS_NODE_DEVICE_MQTT_16_077: [`getTwin` shall call the `getTwin` method on the `MqttTwinClient` object and pass it its callback.]*/
          getTwin: (callback) => this._twinClient.getTwin(callback),
          /*Codes_SRS_NODE_DEVICE_MQTT_16_080: [`updateTwinReportedProperties` shall call the `updateTwinReportedProperties` method on the `MqttTwinClient` object and pass it its callback.]*/
          updateTwinReportedProperties: (patch, callback) => this._twinClient.updateTwinReportedProperties(patch, callback),
          /*Codes_SRS_NODE_DEVICE_MQTT_16_059: [`enableTwinDesiredPropertiesUpdates` shall call the `enableTwinDesiredPropertiesUpdates` on the `MqttTwinClient` object created by the constructor and pass it its callback.]*/
          enableTwinDesiredPropertiesUpdates: (callback) => this._twinClient.enableTwinDesiredPropertiesUpdates(callback),
          /*Codes_SRS_NODE_DEVICE_MQTT_16_083: [`disableTwinDesiredPropertiesUpdates` shall call the `disableTwinDesiredPropertiesUpdates` on the `MqttTwinClient` object created by the constructor and pass it its callback.]*/
          disableTwinDesiredPropertiesUpdates: (callback) => this._twinClient.disableTwinDesiredPropertiesUpdates(callback),
        },
        disconnecting: {
          _onEnter: (disconnectCallback, _err) => {
            /*Codes_SRS_NODE_DEVICE_MQTT_16_001: [The `disconnect` method should call the `disconnect` method on `MqttBase`.]*/
            /*Codes_SRS_NODE_DEVICE_MQTT_16_022: [The `disconnect` method shall call its callback with a `null` error parameter and a `results.Disconnected` response if `MqttBase` successfully disconnects if not disconnected already.]*/
            this._mqtt.disconnect((err, result) => {
              this._fsm.transition('disconnected', disconnectCallback, err, new results.Disconnected(result));
            });
          },
          /*Codes_SRS_NODE_DEVICE_MQTT_16_026: [If `sendEvent` is called while `MqttBase` is disconnecting, it shall wait until the disconnection is complete and then try to connect again and send the event. ]*/
          /*Codes_SRS_NODE_DEVICE_MQTT_18_049: [If `sendOutputEvent` is called while `MqttBase` is disconnecting, it shall wait until the disconnection is complete and then try to connect again and send the event. ]*/
          '*': () => this._fsm.deferUntilTransition()
        }
      }
    });

    this._fsm.on('transition', (data) => {
      debug(data.fromState + ' -> ' + data.toState + ' (' + data.action + ')');
    });
  }