constructor()

in provisioning/transport/mqtt/src/mqtt.ts [49:217]


  constructor(mqttBase?: MqttBase) {
    super();
    this._mqttBase = mqttBase || new MqttBase();
    this._config.pollingInterval = ProvisioningDeviceConstants.defaultPollingInterval;

    const responseHandler = (topic: string, payload: any) => {
      let payloadString: string = payload.toString('ascii');
      debug('message received on ' + topic);
      debug('request payload is: ' + payloadString);

      /* Codes_SRS_NODE_PROVISIONING_MQTT_18_010: [ When waiting for responses, `registrationRequest` shall watch for messages with a topic named $dps/registrations/res/<status>/?$rid=<rid>.] */
      /* Codes_SRS_NODE_PROVISIONING_MQTT_18_024: [ When waiting for responses, `queryOperationStatus` shall watch for messages with a topic named $dps/registrations/res/<status>/?$rid=<rid>.] */
      let match = topic.match(/^\$dps\/registrations\/res\/(.*)\/\?(.*)$/);

      if (!!match && match.length === 3) {
        let queryParameters = queryString.parse(match[2]);
        if (queryParameters.$rid) {
          let rid: string = queryParameters.$rid as string;
          if (this._operations[rid]) {
            let status: number = Number(match[1]);
            let payloadJson: any = JSON.parse(payloadString);
            let handler = this._operations[rid].handler;
            let statusString = this._operations[rid].statusString;
            let operationId = this._operations[rid].operationId;
            let retryAfterInMilliseconds: number = this._config.pollingInterval;
            const retryParameter = 'retry-after';
            /* Codes_SRS_NODE_PROVISIONING_MQTT_06_005: [ If the response to the `queryOperationStatus` contains a query parameter of `retry-after` that value * 1000 shall be the value of `callback` `pollingInterval` argument, otherwise default.] */
            /* Codes_SRS_NODE_PROVISIONING_MQTT_06_006: [ If the response to the `registrationRequest` contains a query parameter of `retry-after` that value * 1000 shall be the value of `callback` `pollingInterval` argument, otherwise default.] */
            if (queryParameters[retryParameter]) {
              retryAfterInMilliseconds = Number(queryParameters[retryParameter]) * 1000;
            }
            delete this._operations[rid];
            if (status < 300) {
              /* Codes_SRS_NODE_PROVISIONING_MQTT_18_013: [ When `registrationRequest` receives a successful response from the service, it shall call `callback` passing in null and the response.] */
              /* Codes_SRS_NODE_PROVISIONING_MQTT_18_027: [ When `queryOperationStatus` receives a successful response from the service, it shall call `callback` passing in null and the response.] */
              handler(null, payloadJson, retryAfterInMilliseconds);
            } else if (status >= 429) {
              /*Codes_SRS_NODE_PROVISIONING_MQTT_06_003: [ When `registrationRequest` receives a response with status >429, it shall invoke `callback` with a result object containing property `status` with a value `registering` and no `operationId` property.] */
              /*Codes_SRS_NODE_PROVISIONING_MQTT_06_004: [ When `queryOperationStatus` receives a response with status >429, it shall invoke `callback` with a result object containing property `status` with a value `assigning` and `operationId` property with value of the passed to the request.] */
              handler(null, {status: statusString, operationId: operationId}, retryAfterInMilliseconds);
            } else {
              /* Codes_SRS_NODE_PROVISIONING_MQTT_18_012: [ If `registrationRequest` receives a response with status >= 300 and <429, it shall consider the request failed and create an error using `translateError`.] */
              /* Codes_SRS_NODE_PROVISIONING_MQTT_18_015: [ When `registrationRequest` receives an error from the service, it shall call `callback` passing in the error.] */
              /* Codes_SRS_NODE_PROVISIONING_MQTT_18_026: [ If `queryOperationStatus` receives a response with status >= 300 and <429, it shall consider the query failed and create an error using `translateError`.] */
              /* Codes_SRS_NODE_PROVISIONING_MQTT_18_029: [ When `queryOperationStatus` receives an error from the service, it shall call `callback` passing in the error.] */
              handler(translateError('incoming message failure', status, payloadJson, { topic: topic, payload: payloadJson }));
            }
          } else {
            debug('received an unknown request id: ' + rid + ' topic: ' + topic);
          }
        } else {
          debug('received message with no request id. Topic is: ' + topic);
        }
      } else {
        debug('received a topic string with insufficient content: ' + topic);
      }
    };

    const errorHandler = (err: Error) => {
      this._fsm.handle('disconnect', err);
    };

    this._mqttBase.on('message', responseHandler);
    this._mqttBase.on('error', errorHandler);

    this._fsm = new machina.Fsm({
      namespace: 'provisioning-transport-mqtt',
      initialState: 'disconnected',
      states: {
        disconnected: {
          _onEnter: (err, callback) => {
            if (callback) {
              callback(err);
            } else if (err) {
              this.emit('error', err);
            }
          },
          registrationRequest: (request, rid, callback) => {
            this._operations[rid] = {handler: callback, statusString: 'registering'};
            /* Codes_SRS_NODE_PROVISIONING_MQTT_18_002: [ If the transport is not connected, `registrationRequest` shall connect it and subscribe to the response topic.] */
            this._fsm.handle('connect', request, (err) => {
              if (err) {
                callback(err);
              } else {
                this._fsm.handle('registrationRequest', request, rid, callback);
              }
            });
          },
          queryOperationStatus: (request, rid, operationId, callback) => {
            this._operations[rid] = {handler: callback, statusString: 'assigning', operationId: operationId};
            /* Codes_SRS_NODE_PROVISIONING_MQTT_18_016: [ If the transport is not connected, `queryOperationStatus` shall connect it and subscribe to the response topic.] */
            this._fsm.handle('connect', request, (err) => {
              if (err) {
                callback(err);
              } else {
                this._fsm.handle('queryOperationStatus', request, rid, operationId, callback);
              }
            });
          },
          connect: (request, callback) => this._fsm.transition('connecting', request, callback),
          /* Codes_SRS_NODE_PROVISIONING_MQTT_18_052: [ If `disconnect` is called while the transport is disconnected, it will call `callback` immediately. ] */
          disconnect: (err, callback) => callback(err),
          /* Codes_SRS_NODE_PROVISIONING_MQTT_18_030: [ If `cancel` is called while the transport is disconnected, it will call `callback` immediately.] */
          cancel: (callback) => callback()
        },
        connecting: {
          _onEnter: (request, callback) => {
            this._connect(request, (err) => {
              if (err) {
                /* Codes_SRS_NODE_PROVISIONING_MQTT_18_051: [ If either `_mqttBase.connect` or `_mqttBase.subscribe` fails, `mqtt` will disconnect the transport. ] */
                this._fsm.transition('disconnecting', err, callback);
              } else {
                this._fsm.transition('connected', null, request, null, callback);
              }
            });
          },
          cancel: (callback) => {
            /* Codes_SRS_NODE_PROVISIONING_MQTT_18_062: [ If `cancel` is called while the transport is in the process of connecting, it shell disconnect transport and cancel the operation that initiated the connection. ] */
            this._cancelAllOperations();
            this._fsm.transition('disconnecting', null, callback);
          },
          disconnect: (err, callback) => {
            /* Codes_SRS_NODE_PROVISIONING_MQTT_18_061: [ If `disconnect` is called while the transport is in the process of connecting, it shell disconnect connection and cancel the operation that initiated the connection. ] */
            this._cancelAllOperations();
            this._fsm.transition('disconnecting', err, callback);
          },
          '*': () => this._fsm.deferUntilTransition()
        },
        connected: {
          _onEnter: (err, request, result, callback) => callback(err, result, request),
          registrationRequest: (request, rid, callback) => {
            this._operations[rid] = {handler: callback, statusString: 'registering'};
            this._sendRegistrationRequest(request, rid, (err, result) => {
              callback(err, result, request);
            });
          },
          queryOperationStatus: (request, rid, operationId, callback) => {
            this._operations[rid] = {handler: callback, statusString: 'assigning', operationId: operationId};
            this._sendOperationStatusQuery(request, rid, operationId, (err, result) => {
              callback(err, result, request);
            });
          },
          cancel: (callback) => {
            /* Codes_SRS_NODE_PROVISIONING_MQTT_18_032: [ If `cancel` is called while the transport is connected and idle, it will call `callback` immediately.] */
            /* Codes_SRS_NODE_PROVISIONING_MQTT_18_033: [ If `cancel` is called while the transport is in the middle of a `registrationRequest` operation, it will stop listening for a response and cause `registrationRequest` call it's `callback` passing an `OperationCancelledError` error.] */
            /* Codes_SRS_NODE_PROVISIONING_MQTT_18_034: [ If `cancel` is called while the transport is in the middle of a `queryOperationStatus` operation, it will stop listening for a response and cause `registrationRequest` call it's `callback` passing an `OperationCancelledError` error.] */
            this._cancelAllOperations();
            callback();
          },
          /* Codes_SRS_NODE_PROVISIONING_MQTT_18_054: [ If `disconnect` is called while the transport is connected and idle, it shall disconnect. ] */
          /* Codes_SRS_NODE_PROVISIONING_MQTT_18_055: [ If `disconnect` is called while the transport is in the middle of a `registrationRequest` operation, it shall cancel the `registrationRequest` operation and disconnect the transport. ] */
          /* Codes_SRS_NODE_PROVISIONING_MQTT_18_056: [ If `disconnect` is called while the transport is in the middle of a `queryOperationStatus` operation, it shall cancel the `queryOperationStatus` operation and disconnect the transport. ] */
          disconnect: (err, callback) => {
            this._cancelAllOperations();
            this._fsm.transition('disconnecting', err, callback);
          }
        },
        disconnecting: {
          _onEnter: (err, callback) => {
            this._disconnect((disconnectErr) => {
              this._fsm.transition('disconnected', err || disconnectErr, callback);
            });
          },
          '*': () => this._fsm.deferUntilTransition()
        }
      }
    });
    this._fsm.on('transition', (data) => debug('MQTT State Machine: ' + data.fromState + ' -> ' + data.toState + ' (' + data.action + ')'));
  }