constructor()

in device/transport/mqtt/src/mqtt_twin_client.ts [52:135]


  constructor(client: MqttBase) {
    super();
    /*Codes_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_18_001: [The `MqttTwinClient` constructor shall accept a `client` object.]*/
    this._mqtt = client;

    const messageHandler = this._onMqttMessage.bind(this);

    /*Codes_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_001: [The `MqttTwinClient` constructor shall immediately subscribe to the `message` event of the `client` object.]*/
    this._mqtt.on('message', messageHandler);

    this._topicFsm = new machina.BehavioralFsm({
      initialState: 'unsubscribed',
      states: {
        unsubscribed: {
          _onEnter: function (_topicSubscription: TopicSubscription, err: Error, callback: (err?: Error) => void): void {
            if (callback) {
              callback(err);
            }
          },
          subscribe: function (topicSubscription: TopicSubscription, callback: (err?: Error) => void): void {
            this.transition(topicSubscription, 'subscribing', callback);
          },
          unsubscribe: function (_topicSubscription: TopicSubscription, callback: (err?: Error) => void): void {
            // not entirely sure about that. if subscription are restored because cleanSession is false, it means technically a user may want to unsubscribe
            // even though subscribe hasn't been called yet.
            callback();
          }
        },
        subscribing: {
          _onEnter: function (topicSubscription: TopicSubscription, callback: (err?: Error) => void): void {
            topicSubscription.mqttClient.subscribe(topicSubscription.topic, { qos: 0 }, (err, result) => {
              if (err) {
                this.transition(topicSubscription, 'unsubscribed', err, callback);
              } else {
                debug('subscribed to response topic: ' + JSON.stringify(result));
                this.transition(topicSubscription, 'subscribed', callback);
              }
            });
          },
          '*': function (topicSubscription: TopicSubscription): void { this.deferUntilTransition(topicSubscription); }
        },
        subscribed: {
          _onEnter: function (_topicSubscription: TopicSubscription, callback: (err?: Error) => void): void {
            callback();
          },
          subscribe: function (_topicSubscription: TopicSubscription, callback: (err?: Error) => void): void {
            callback();
          },
          unsubscribe: function (topicSubscription: TopicSubscription, callback: (err?: Error) => void): void {
            this.transition(topicSubscription, 'unsubscribing', callback);
          }
        },
        unsubscribing: {
          _onEnter: function (topicSubscription: TopicSubscription, callback: (err?: Error) => void): void {
            topicSubscription.mqttClient.unsubscribe(topicSubscription.topic, (err, _result) => {
              if (err) {
                debugErrors('failed to unsubscribe: ' + err);
              } else {
                debug('unsubscribed from: ' + topicSubscription.topic);
              }
              this.transition(topicSubscription, 'unsubscribed', err, callback);
            });
          },
          '*': function (topicSubscription: TopicSubscription): void { this.deferUntilTransition(topicSubscription); }
        }
      },
      subscribe: function (topicSubscription: TopicSubscription, callback: (err?: Error) => void): void {
        this.handle(topicSubscription, 'subscribe', callback);
      },
      unsubscribe: function (topicSubscription: TopicSubscription, callback: (err?: Error) => void): void {
        this.handle(topicSubscription, 'unsubscribe', callback);
      }
    });

    this._responseTopic = {
      mqttClient: this._mqtt,
      topic: responseTopic
    };

    this._desiredPropertiesUpdatesTopic = {
      mqttClient: this._mqtt,
      topic: desiredPropertiesUpdatesTopic
    };
  }