constructor()

in device/transport/amqp/src/amqp_twin_client.ts [61:252]


  constructor(authenticationProvider: AuthenticationProvider, client: any) {
    super();
    this._client = client;
    this._authenticationProvider = authenticationProvider;
    this._senderLink = null;
    this._receiverLink = null;
    this._pendingTwinRequests = {};

    this._messageHandler = (message: AmqpMessage): void => {
      //
      // The ONLY time we should see a message on the receiver link without a correlationId is if the message is a desired property delta update.
      //
      const correlationId: string = message.correlation_id;
      if (correlationId) {
        this._onResponseMessage(message);
      } else if (Object.prototype.hasOwnProperty.call(message, 'body')) {
        this._onDesiredPropertyDelta(message);
      } else {
        //
        // Can't be any message we know what to do with.  Just drop it on the floor.
        //
        debug('malformed response message received from service: ' + JSON.stringify(message));
      }
    };

    this._errorHandler = (err: Error): void => this._fsm.handle('handleLinkError', err);

    this._fsm = new machina.Fsm({
      namespace: 'amqp-twin-client',
      initialState: 'detached',
      states: {
        detached: {
          _onEnter: (err, detachCallback) => {
            if (detachCallback) {
                detachCallback(err);
            } else {
              if (err) {
                this.emit('error', err);
              }
            }
          },
          getTwin: (callback) => {
            /*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_007: [The `getTwin` method shall attach the sender link if it's not already attached.]*/
            /*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_009: [THe `getTwin` method shall attach the receiver link if it's not already attached.]*/
            this._fsm.transition('attaching', (err) => {
              if (err) {
                /*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_008: [If attaching the sender link fails, the `getTwin` method shall call its callback with the error that caused the failure.]*/
                /*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_010: [If attaching the receiver link fails, the `getTwin` method shall call its callback with the error that caused the failure.]*/
                callback(err);
              } else {
                this._fsm.handle('getTwin', callback);
              }
            });
          },
          updateTwinReportedProperties: (patch, callback) => {
            /*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_015: [The `updateTwinReportedProperties` method shall attach the sender link if it's not already attached.]*/
            /*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_017: [THe `updateTwinReportedProperties` method shall attach the receiver link if it's not already attached.]*/
            this._fsm.transition('attaching', (err) => {
              if (err) {
                /*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_016: [If attaching the sender link fails, the `updateTwinReportedProperties` method shall call its callback with the error that caused the failure.]*/
                /*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_018: [If attaching the receiver link fails, the `updateTwinReportedProperties` method shall call its callback with the error that caused the failure.]*/
                callback(err);
              } else {
                this._fsm.handle('updateTwinReportedProperties', patch, callback);
              }
            });
          },
          enableTwinDesiredPropertiesUpdates: (callback) => {
            /*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_023: [The `enableTwinDesiredPropertiesUpdates` method shall attach the sender link if it's not already attached.]*/
            /*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_025: [The `enableTwinDesiredPropertiesUpdates` method shall attach the receiver link if it's not already attached.]*/
            this._fsm.transition('attaching', (err) => {
              if (err) {
                /*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_024: [If attaching the sender link fails, the `enableTwinDesiredPropertiesUpdates` method shall call its callback with the error that caused the failure.]*/
                /*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_026: [If attaching the receiver link fails, the `enableTwinDesiredPropertiesUpdates` method shall call its callback with the error that caused the failure.]*/
                callback(err);
              } else {
                this._fsm.handle('enableTwinDesiredPropertiesUpdates', callback);
              }
            });
          },
          /*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_031: [The `disableTwinDesiredPropertiesUpdates` method shall call its callback immediately and with no arguments if the links are detached.]*/
          disableTwinDesiredPropertiesUpdates: (callback) => callback(),
          /*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_004: [The `detach` method shall call its `callback` immediately if the links are already detached.]*/
          detach: (callback) => callback()
        },
        attaching: {
          _onEnter: (attachCallback) => {
            /*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_007: [The `attach` method shall call the `getDeviceCredentials` method on the `authenticationProvider` object passed as an argument to the constructor to retrieve the device id.]*/
            this._authenticationProvider.getDeviceCredentials((err, credentials) => {
              if (err) {
                /*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_008: [The `attach` method shall call its callback with an error if the call to `getDeviceCredentials` fails with an error.]*/
                this._fsm.transition('detached', err, attachCallback);
              } else {
                /*Codes_SRS_NODE_DEVICE_AMQP_TWIN_06_007: [The endpoint argument for attachReceiverLink shall be `/device/<deviceId>/twin`.] */
                /*Codes_SRS_NODE_DEVICE_AMQP_TWIN_18_001: [If a `moduleId` value was set in the device's connection string, the endpoint argument for `attachReceiverLink` shall be `/devices/<deviceId>/modules/<moduleId>/twin`]*/
                /*Codes_SRS_NODE_DEVICE_AMQP_TWIN_06_009: [The endpoint argument for attachSenderLink shall be `/device/<deviceId>/twin`.] */
                /*Codes_SRS_NODE_DEVICE_AMQP_TWIN_18_002: [If a `moduleId` value was set in the device's connection string, the endpoint argument for `attachSenderLink` shall be `/device/<deviceId>/modules/<moduleId>/twin`.]*/
                if (credentials.moduleId) {
                  this._endpoint = endpoint.moduleTwinPath(credentials.deviceId, credentials.moduleId);
                } else {
                  this._endpoint = endpoint.deviceTwinPath(credentials.deviceId);
                }
                /*Codes_SRS_NODE_DEVICE_AMQP_TWIN_06_006: [When a listener is added for the `response` event, and the `post` event is NOT already subscribed, upstream and downstream links are established via calls to `attachReceiverLink` and `attachSenderLink`.] */
                /*Codes_SRS_NODE_DEVICE_AMQP_TWIN_06_012: [When a listener is added for the `post` event, and the `response` event is NOT already subscribed, upstream and downstream links are established via calls to `attachReceiverLink` and `attachSenderLine`.] */
                /*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_036: [The same correlationId shall be used for both the sender and receiver links.]*/
                const linkCorrelationId: string  = uuid.v4().toString();
                this._client.attachSenderLink( this._endpoint, this._generateTwinLinkProperties(linkCorrelationId), (senderLinkError?: Error, senderTransportObject?: any): void => {
                  if (senderLinkError) {
                      /* Codes_SRS_NODE_DEVICE_AMQP_TWIN_06_022: [If an error occurs on establishing the upstream or downstream link then the `error` event shall be emitted.] */
                      this._fsm.transition('detached', senderLinkError, attachCallback);
                  } else {
                    this._senderLink = senderTransportObject;
                    this._senderLink.on('error', this._errorHandler);
                    this._client.attachReceiverLink( this._endpoint, this._generateTwinLinkProperties(linkCorrelationId, true), (receiverLinkError?: Error, receiverTransportObject?: any): void => {
                      if (receiverLinkError) {
                        this._fsm.transition('detached', receiverLinkError, attachCallback);
                      } else {
                        this._receiverLink = receiverTransportObject;
                        this._receiverLink.on('message', this._messageHandler);
                        this._receiverLink.on('error', this._errorHandler);
                        this._fsm.transition('attached', attachCallback);
                      }
                    });
                  }
                });
              }
             });
          },
          handleLinkError: (err, callback) => this._fsm.transition('detaching', err, callback),
          detach: (callback) => this._fsm.transition('detaching', null, callback),
          '*': () => this._fsm.deferUntilTransition()
        },
        attached: {
          _onEnter: (callback) => {
            callback();
          },
          handleLinkError: (err) => {
            this._fsm.transition('detaching', err);
          },
          /*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_011: [** The `getTwin` method shall send an `AmqpMessage` using the `SenderLink.send` method with the following annotations and properties:
          - `operation` annotation set to `GET`.
          - `resource` annotation set to `undefined`
          - `correlationId` property set to a uuid
          - `body` set to ` `.]*/
          getTwin: (callback) => this._sendTwinRequest(TwinMethod.GET, undefined, ' ', callback),
          /*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_019: [The `updateTwinReportedProperties` method shall send an `AmqpMessage` using the `SenderLink.send` method with the following annotations and properties:
          - `operation` annotation set to `PATCH`.
          - `resource` annotation set to `/properties/reported`
          - `correlationId` property set to a uuid
          - `body` set to the stringified patch object.]*/
          updateTwinReportedProperties: (patch, callback) => this._sendTwinRequest(TwinMethod.PATCH, '/properties/reported', JSON.stringify(patch), callback),
          /*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_027: [The `enableTwinDesiredPropertiesUpdates` method shall send an `AmqpMessage` using the `SenderLink.send` method with the following annotations and properties:
          - `operation` annotation set to `PUT`.
          - `resource` annotation set to `/notifications/twin/properties/desired`
          - `correlationId` property set to a uuid
          - `body` set to `undefined`.]*/
          enableTwinDesiredPropertiesUpdates: (callback) => this._sendTwinRequest(TwinMethod.PUT, '/notifications/twin/properties/desired', ' ', callback),
          /*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_032: [The `disableTwinDesiredPropertiesUpdates` method shall send an `AmqpMessage` using the `SenderLink.send` method with the following annotations and properties:
          - `operation` annotation set to `DELETE`.
          - `resource` annotation set to `/notifications/twin/properties/desired`
          - `correlationId` property set to a uuid
          - `body` set to `undefined`.]*/
          disableTwinDesiredPropertiesUpdates: (callback) => this._sendTwinRequest(TwinMethod.DELETE, '/notifications/twin/properties/desired', ' ', callback),
          detach: (callback) => this._fsm.transition('detaching', null, callback)
        },
        detaching: {
          _onEnter: (err, detachCallback) => {
            const senderLink = this._senderLink;
            const receiverLink = this._receiverLink;
            /*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_005: [The `detach` method shall detach the links and call its `callback` with no arguments if the links are successfully detached.]*/
            this._client.detachSenderLink(this._endpoint, (detachSenderError: Error, _result?: any) => {
              senderLink.removeListener('error', this._errorHandler);
              if (detachSenderError) {
                debugErrors('we received an error for the detach of the upstream link during the disconnect.  Moving on to the downstream link. Error=' + detachSenderError);
              }
              this._client.detachReceiverLink(this._endpoint,  (detachReceiverError: Error, _result?: any) => {
                receiverLink.removeListener('message', this._messageHandler);
                receiverLink.removeListener('error', this._errorHandler);
                if (detachReceiverError) {
                  debugErrors('we received an error for the detach of the downstream link during the disconnect. Error=' + detachReceiverError);
                }
                /*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_006: [The `detach` method shall call its `callback` with an `Error` if detaching either of the links fail.]*/
                const possibleError = err || detachSenderError || detachReceiverError;
                this._fsm.transition('detached', possibleError, detachCallback);
              });
            });
          },
          '*': () => this._fsm.deferUntilTransition()
        }
      }
    });
  }