constructor()

in provisioning/transport/amqp/src/amqp.ts [66:480]


  constructor(amqpBase?: Base) {
    super();
    this._amqpBase = amqpBase || new Base(true);
    this._config.pollingInterval = ProvisioningDeviceConstants.defaultPollingInterval;

    const amqpErrorListener = (err) => this._amqpStateMachine.handle('amqpError', err);

    const responseHandler = (msg) => {
      debug('got message with correlation_id: ' + msg.correlation_id);
      /*Codes_SRS_NODE_PROVISIONING_AMQP_16_007: [The `registrationRequest` method shall call its callback with a `RegistrationResult` object parsed from the body of the response message which `correlation_id` matches the `correlation_id` of the request message sent on the sender link.]*/
      /*Codes_SRS_NODE_PROVISIONING_AMQP_16_017: [The `queryOperationStatus` method shall call its callback with a `RegistrationResult` object parsed from the body of the response message which `correlation_id` matches the `correlation_id` of the request message sent on the sender link.]*/
      const registrationResult = JSON.parse(msg.body.content);
      if (this._operations[msg.correlation_id]) {
        debug('Got the registration/operationStatus message we were looking for.');
        const requestCallback = this._operations[msg.correlation_id];
        delete this._operations[msg.correlation_id];
        let retryAfterInMilliseconds: number;
        /*Codes_SRS_NODE_PROVISIONING_AMQP_06_010: [If the amqp response to a request contains the application property`retry-after`, it will be interpreted as the number of seconds that should elapse before the next attempted operation.  Otherwise default.] */
        if (msg.application_properties && msg.application_properties[MessagePropertyNames.retryAfter]) {
          retryAfterInMilliseconds = Number(msg.application_properties[MessagePropertyNames.retryAfter]) * 1000;
          debug('registration/operation retry after value of: ' + msg.application_properties[MessagePropertyNames.retryAfter]);
        } else {
          debug('registration/operation retry after value defaulting.');
          retryAfterInMilliseconds = this._config.pollingInterval;
        }
        requestCallback(null, registrationResult, msg, retryAfterInMilliseconds);
      } else {
        debug('ignoring message with unknown correlation_id');
      }
    };

    this._amqpStateMachine = new machina.Fsm({
      namespace: 'provisioning-amqp',
      initialState: 'disconnected',
      states: {
        disconnected: {
          _onEnter: (err, registrationResult, callback) => {
            if (callback) {
              callback(err, registrationResult);
            } else if (err) {
              this.emit('error', err);
            }
          },
          registrationRequest: (request, correlationId, callback) => {
            this._operations[correlationId] = callback;
            this._amqpStateMachine.transition('connectingX509OrSymmetricKey', request, (err) => {
              if (err) {
                delete this._operations[correlationId];
                callback(err);
              } else {
                this._amqpStateMachine.handle('registrationRequest', request, correlationId, callback);
              }
            });
          },
          queryOperationStatus: (request, correlationId, operationId, callback) => {
            this._operations[correlationId] = callback;
            this._amqpStateMachine.transition('connectingX509OrSymmetricKey', request, (err) => {
              if (err) {
                delete this._operations[correlationId];
                callback(err);
              } else {
                this._amqpStateMachine.handle('queryOperationStatus', request, correlationId, operationId, callback);
              }
            });
          },
          getAuthenticationChallenge: (request, callback) => this._amqpStateMachine.transition('connectingTpm', request, callback),
          /*Codes_SRS_NODE_PROVISIONING_AMQP_18_017: [ `respondToAuthenticationChallenge` shall call `callback` with an `InvalidOperationError` if called before calling `getAuthenticationChallenge`. ]*/
          respondToAuthenticationChallenge: (_request, _sasToken, callback) => callback(new errors.InvalidOperationError('Cannot respond to challenge while disconnected.')),
          /*Codes_SRS_NODE_PROVISIONING_AMQP_18_003: [ `cancel` shall call its callback immediately if the AMQP connection is disconnected. ] */
          cancel: (callback) => callback(),
          /*Codes_SRS_NODE_PROVISIONING_AMQP_16_022: [`disconnect` shall call its callback immediately if the AMQP connection is disconnected.]*/
          disconnect: (callback) => callback()
        },
        connectingX509OrSymmetricKey: {
          _onEnter: (request, callback) => {
            /*Codes_SRS_NODE_PROVISIONING_AMQP_16_002: [The `registrationRequest` method shall connect the AMQP client with the certificate and key given in the `auth` parameter of the previously called `setAuthentication` method.]*/
            /*Codes_SRS_NODE_PROVISIONING_AMQP_16_012: [The `queryOperationStatus` method shall connect the AMQP client with the certificate and key given in the `auth` parameter of the previously called `setAuthentication` method. **]**]*/
            let config: AmqpBaseTransportConfig = {
              uri: this._getConnectionUri(request),
              sslOptions: this._x509Auth,
              userAgentString: ProvisioningDeviceConstants.userAgent
            };
            /*Codes_SRS_NODE_PROVISIONING_AMQP_99_001: [The `registrationRequest` method shall connect the AMQP client with the agent given in the `webSocketAgent` parameter of the previously called `setTransportOptions` method.]*/
            config.sslOptions = config.sslOptions || {};
            config.sslOptions.agent = this._config.webSocketAgent;
            if (this._sas) {
              /*Codes_SRS_NODE_PROVISIONING_AMQP_06_002: [** The `registrationRequest` method shall connect the amqp client, if utilizing the passed in sas from setSharedAccessSignature, shall in the connect options set the username to:
                ```
                <scopeId>/registrations/<registrationId>
                ```
                and shall set the password to the passed in sas token.
                ] */
              config.policyOverride = {
                username: request.idScope + '/registrations/' + request.registrationId,
                password: this._sas
              };
            }
            this._amqpBase.connect(config, (err) => {
              if (err) {
                debugErrors('_amqpBase.connect failed');
                debugErrors(err);
                /*Codes_SRS_NODE_PROVISIONING_AMQP_16_008: [The `registrationRequest` method shall call its callback with an error if the transport fails to connect.]*/
                /*Codes_SRS_NODE_PROVISIONING_AMQP_16_018: [The `queryOperationStatus` method shall call its callback with an error if the transport fails to connect.]*/
                this._amqpStateMachine.transition('disconnected', err, null, callback);
              } else {
                this._amqpStateMachine.transition('attachingLinks', request, callback);
              }
            });
          },
          /*Codes_SRS_NODE_PROVISIONING_AMQP_18_005: [ `cancel` shall disconnect the AMQP connection and cancel the operation that initiated a connection if called while the connection is in process. ] */
          cancel: (callback) => {
            this._cancelAllOperations();
            this._amqpStateMachine.transition('disconnecting', null, null, callback);
          },
          /*Codes_SRS_NODE_PROVISIONING_AMQP_18_009: [ `disconnect` shall disconnect the AMQP connection and cancel the operation that initiated a connection if called while the connection is in process. ] */
          disconnect: (callback) => {
            this._cancelAllOperations();
            this._amqpStateMachine.transition('disconnecting', null, null, callback);
          },
          '*': () => this._amqpStateMachine.deferUntilTransition()
        },

        connectingTpm: {
          _onEnter: (request, callback) => this._getAuthChallenge(request, callback),

          respondToAuthenticationChallenge: (request, sasToken, callback) => {
            let completionCompleteHandler = this._amqpStateMachine.on('tpmConnectionComplete', (err) => {
              completionCompleteHandler.off();
              if (err) {
                /*Codes_SRS_NODE_PROVISIONING_AMQP_18_019: [ `respondToAuthenticationChallenge` shall call `callback` with an Error object if the connection has a failure. ]*/
                this._amqpStateMachine.transition('disconnected', err, null, callback);
              } else {
                /*Codes_SRS_NODE_PROVISIONING_AMQP_18_020: [ `respondToAuthenticationChallenge` shall attach sender and receiver links if the connection completes successfully. ]*/
                this._amqpStateMachine.transition('attachingLinks', request, callback);
              }
            });

            this._respondToAuthChallenge(sasToken);
          },

          tpmConnectionComplete: (err) => {
            this._amqpStateMachine.emit('tpmConnectionComplete', err);
          },

          /*Codes_SRS_NODE_PROVISIONING_AMQP_18_005: [ `cancel` shall disconnect the AMQP connection and cancel the operation that initiated a connection if called while the connection is in process. ] */
          cancel: (callback) => {
            this._cancelAllOperations();
            this._amqpStateMachine.transition('disconnecting', null, null, callback);
          },
          /*Codes_SRS_NODE_PROVISIONING_AMQP_18_009: [ `disconnect` shall disconnect the AMQP connection and cancel the operation that initiated a connection if called while the connection is in process. ] */
          disconnect: (callback) => {
            this._cancelAllOperations();
            this._amqpStateMachine.transition('disconnecting', null, null, callback);
          },
          '*': () => this._amqpStateMachine.deferUntilTransition()
        },

        attachingLinks: {
          _onEnter: (request, callback) => {
            const linkEndpoint = request.idScope + '/registrations/' + request.registrationId;
            const linkOptions = {
              properties: {
                'com.microsoft:api-version' : ProvisioningDeviceConstants.apiVersion
              }
            };

            /*Codes_SRS_NODE_PROVISIONING_AMQP_16_004: [The `registrationRequest` method shall attach a receiver link on the `<idScope>/registrations/<registrationId>` endpoint with the following properties:
            ```
            com.microsoft:api-version: <API_VERSION>
            com.microsoft:client-version: <CLIENT_VERSION>
            ```]*/
            /*Codes_SRS_NODE_PROVISIONING_AMQP_16_014: [The `queryOperationStatus` method shall attach a receiver link on the `<idScope>/registrations/<registrationId>` endpoint with the following properties:
            ```
            com.microsoft:api-version: <API_VERSION>
            com.microsoft:client-version: <CLIENT_VERSION>
            ```*/
            this._amqpBase.attachReceiverLink(linkEndpoint, linkOptions, (err, receiverLink) => {
              if (err) {
                debugErrors('_amqpBase.attachReceiverLink failed');
                debugErrors(err);
                /*Codes_SRS_NODE_PROVISIONING_AMQP_16_010: [The `registrationRequest` method shall call its callback with an error if the transport fails to attach the receiver link.]*/
                /*Codes_SRS_NODE_PROVISIONING_AMQP_16_020: [The `queryOperationStatus` method shall call its callback with an error if the transport fails to attach the receiver link.]*/
                /*Codes_SRS_NODE_PROVISIONING_AMQP_18_022: [ `respondToAuthenticationChallenge` shall call its callback passing an `Error` object if the transport fails to attach the receiver link. ]*/
                this._amqpStateMachine.transition('disconnecting', err, null, callback);
              } else {
                this._receiverLink = receiverLink;
                this._receiverLink.on('error', amqpErrorListener);
                /*Codes_SRS_NODE_PROVISIONING_AMQP_16_006: [The `registrationRequest` method shall listen for the response on the receiver link and accept it when it comes.]*/
                /*Codes_SRS_NODE_PROVISIONING_AMQP_16_016: [The `queryOperationStatus` method shall listen for the response on the receiver link and accept it when it comes.]*/
                this._receiverLink.on('message', responseHandler);

                /*Codes_SRS_NODE_PROVISIONING_AMQP_16_003: [The `registrationRequest` method shall attach a sender link on the `<idScope>/registrations/<registrationId>` endpoint with the following properties:
                ```
                com.microsoft:api-version: <API_VERSION>
                com.microsoft:client-version: <CLIENT_VERSION>
                ```]*/
                /*Codes_SRS_NODE_PROVISIONING_AMQP_16_013: [The `queryOperationStatus` method shall attach a sender link on the `<idScope>/registrations/<registrationId>` endpoint with the following properties:
                ```
                com.microsoft:api-version: <API_VERSION>
                com.microsoft:client-version: <CLIENT_VERSION>
                ```*/
                this._amqpBase.attachSenderLink(linkEndpoint, linkOptions, (err, senderLink) => {
                  if (err) {
                    debugErrors('_amqpBase.attachSenderLink failed');
                    debugErrors(err);
                    /*Codes_SRS_NODE_PROVISIONING_AMQP_16_009: [The `registrationRequest` method shall call its callback with an error if the transport fails to attach the sender link.]*/
                    /*Codes_SRS_NODE_PROVISIONING_AMQP_16_019: [The `queryOperationStatus` method shall call its callback with an error if the transport fails to attach the sender link.]*/
                    /*Codes_SRS_NODE_PROVISIONING_AMQP_18_021: [ `respondToAuthenticationChallenge` shall call its callback passing an `Error` object if the transport fails to attach the sender link. ]*/
                    this._amqpStateMachine.transition('disconnecting', err, null, callback);
                  } else {
                    this._senderLink = senderLink;
                    this._senderLink.on('error', amqpErrorListener);
                    /*Codes_SRS_NODE_PROVISIONING_AMQP_18_023: [ `respondToAuthenticationChallenge` shall call its callback passing `null` if the AMQP connection is established and links are attached. ]*/
                    this._amqpStateMachine.transition('connected', callback);
                  }
                });
              }
            });
          },
          /*Codes_SRS_NODE_PROVISIONING_AMQP_18_005: [ `cancel` shall disconnect the AMQP connection and cancel the operation that initiated a connection if called while the connection is in process. ] */
          cancel: (callback) => {
            this._cancelAllOperations();
            this._amqpStateMachine.transition('disconnecting', null, null, callback);
          },
          /*Codes_SRS_NODE_PROVISIONING_AMQP_18_009: [ `disconnect` shall disconnect the AMQP connection and cancel the operation that initiated a connection if called while the connection is in process. ] */
          disconnect: (callback) => {
            this._cancelAllOperations();
            this._amqpStateMachine.transition('disconnecting', null, null, callback);
          },
          '*': () => this._amqpStateMachine.deferUntilTransition()
        },
        connected: {
          _onEnter: (callback) => callback(),
          registrationRequest: (request, correlationId, callback) => {

            /*Codes_SRS_NODE_PROVISIONING_AMQP_16_005: [The `registrationRequest` method shall send a message on the previously attached sender link with a `correlation_id` set to a newly generated UUID and the following application properties:
            ```
            iotdps-operation-type: iotdps-register;
            iotdps-forceRegistration: <true or false>;
            ```
            ]*/

            let requestMessage = new AmqpMessage();
            /*Codes_SRS_NODE_PROVISIONING_AMQP_06_003: [ The `registrationRequest` will send a body in the message which contains a stringified JSON object with a `registrationId` property.] */
            let requestBody: DeviceRegistration = {registrationId: request.registrationId};
            /*Codes_SRS_NODE_PROVISIONING_AMQP_06_004: [The `registrationRequest` will, if utilizing TPM attestation, send a `tpm` property with the endorsement and storage key in the JSON body.] */
            if (this._endorsementKey) {
              requestBody.tpm = {endorsementKey: this._endorsementKey.toString('base64')};
              if (this._storageRootKey) {
                requestBody.tpm.storageRootKey = this._storageRootKey.toString('base64');
              }
            }
            /*Codes_SRS_NODE_PROVISIONING_AMQP_06_005: [The `registrationRequest` will, if utilizing custom allocation data, send a `payload` property in the JSON body.] */
            if (request.payload) {
              requestBody.payload = request.payload;
            }
            requestMessage.body = rheaMessage.data_section(Buffer.from(JSON.stringify(requestBody)));
            requestMessage.application_properties = {};
            requestMessage.application_properties[MessagePropertyNames.OperationType] = DeviceOperations.Register;
            requestMessage.application_properties[MessagePropertyNames.ForceRegistration] = !!request.forceRegistration;
            requestMessage.correlation_id = correlationId;

            debug('initial registration request: ' + JSON.stringify(requestMessage));
            this._operations[requestMessage.correlation_id] = callback;
            this._senderLink.send(requestMessage, (err) => {
              if (err) {
                delete this._operations[requestMessage.correlation_id];
                const translatedError = translateError('registration failure', err);
                /*Codes_SRS_NODE_PROVISIONING_AMQP_06_007: [If the `registrationRequest` send request is rejected with an `InternalError` or `ThrottlingError`, the result.status value will be set with `registering` and the callback will be invoked with *no* error object.] */
                if ((translatedError instanceof errors.InternalServerError) || ((translatedError as AmqpTransportError) instanceof errors.ThrottlingError)) {
                  debugErrors('retryable error on registration: ' + err.name);
                  let retryAfterInMilliseconds: number;
                  /*Codes_SRS_NODE_PROVISIONING_AMQP_06_009: [If the `registrationRequest` rejection error contains the info property`retry-after`, it will be interpreted as the number of seconds that should elapse before the next attempted operation.  Otherwise default.] */
                  if ((err as any).info && (err as any).info[MessagePropertyNames.retryAfter]) {
                    retryAfterInMilliseconds = Number(((err as any).info[MessagePropertyNames.retryAfter] as string)) * 1000;
                  } else {
                    retryAfterInMilliseconds = this._config.pollingInterval;
                  }
                  callback(null, {status: 'registering'}, null, retryAfterInMilliseconds);
                } else {
                  debugErrors('non-retryable error on registration: ' + err);
                  /*Codes_SRS_NODE_PROVISIONING_AMQP_16_011: [The `registrationRequest` method shall call its callback with an error if the transport fails to send the request message.]*/
                  callback(err);
                }
              } else {
                debug('registration request sent with correlation_id: ' + requestMessage.correlation_id);
              }
            });
          },
          queryOperationStatus: (_request, correlationId, operationId, callback) => {

            /*Codes_SRS_NODE_PROVISIONING_AMQP_16_015: [The `queryOperationStatus` method shall send a message on the pre-attached sender link with a `correlation_id` set to a newly generated UUID and the following application properties:
            ```
            iotdps-operation-type: iotdps-get-operationstatus;
            iotdps-operation-id: <operationId>;
            ```*/
            let requestMessage = new AmqpMessage();
            requestMessage.body = '';
            requestMessage.application_properties = {};
            requestMessage.application_properties[MessagePropertyNames.OperationType] = DeviceOperations.GetOperationStatus;
            requestMessage.application_properties[MessagePropertyNames.OperationId] = operationId;
            requestMessage.correlation_id = correlationId;

            debug('registration status request: ' + JSON.stringify(requestMessage));
            this._operations[requestMessage.correlation_id] = callback;
            this._senderLink.send(requestMessage, (err) => {
              if (err) {
                delete this._operations[requestMessage.correlation_id];
                const translatedError = translateError('query operation status failure', err);
                /*Codes_SRS_NODE_PROVISIONING_AMQP_06_006: [If the `queryOperationStatus` send request is rejected with an `InternalError` or `ThrottlingError`, the result.status value will be set with `assigning` and the callback will be invoked with *no* error object.] */
                if ((translatedError instanceof errors.InternalServerError) || ((translatedError as AmqpTransportError) instanceof errors.ThrottlingError)) {
                  debugErrors('retryable error on queryOperationStatus: ' + err);
                  let retryAfterInMilliseconds: number;
                  /*Codes_SRS_NODE_PROVISIONING_AMQP_06_008: [If the `queryOperationsStatus` rejection error contains the info property`retry-after`, it will be interpreted as the number of seconds that should elapse before the next attempted operation.  Otherwise default.] */
                  if ((err as any).info && (err as any).info[MessagePropertyNames.retryAfter]) {
                    retryAfterInMilliseconds = Number(((err as any).info[MessagePropertyNames.retryAfter] as string)) * 1000;
                  } else {
                    retryAfterInMilliseconds = this._config.pollingInterval;
                  }
                  callback(null, {status: 'assigning', operationId: operationId}, null, retryAfterInMilliseconds);
                } else {
                  debugErrors('non-retryable error on queryOperationStatus: ' + err);
                  /*Codes_SRS_NODE_PROVISIONING_AMQP_16_021: [The `queryOperationStatus` method shall call its callback with an error if the transport fails to send the request message.]*/
                  callback(err);
                }
              } else {
                debug('registration status request sent with correlation_id: ' + requestMessage.correlation_id);
              }
            });
          },
          amqpError: (err) => {
            this._amqpStateMachine.transition('disconnecting', err);
          },
          cancel: (callback) => {
            /*Codes_SRS_NODE_PROVISIONING_AMQP_18_004: [ `cancel` shall call its callback immediately if the AMQP connection is connected but idle. ] */
            /*Codes_SRS_NODE_PROVISIONING_AMQP_18_006: [ `cancel` shall cause a `registrationRequest` operation that is in progress to call its callback passing an `OperationCancelledError` object. ] */
            /*Codes_SRS_NODE_PROVISIONING_AMQP_18_007: [ `cancel` shall cause a `queryOperationStatus` operation that is in progress to call its callback passing an `OperationCancelledError` object. ] */
            /*Codes_SRS_NODE_PROVISIONING_AMQP_18_008: [ `cancel` shall not disconnect the AMQP transport. ] */
            this._cancelAllOperations();
            callback();
          },
          disconnect: (callback) => {
            /*Codes_SRS_NODE_PROVISIONING_AMQP_18_001: [ `disconnect` shall cause a `registrationRequest` operation that is in progress to call its callback passing an `OperationCancelledError` object. ] */
            /*Codes_SRS_NODE_PROVISIONING_AMQP_18_002: [ `disconnect` shall cause a `queryOperationStatus` operation that is in progress to call its callback passing an `OperationCancelledError` object. ] */
            this._cancelAllOperations();
            this._amqpStateMachine.transition('disconnecting', null, null, callback);
          }
        },
        disconnecting: {
          _onEnter: (err, registrationResult, callback) => {
            /*Codes_SRS_NODE_PROVISIONING_AMQP_16_023: [`disconnect` shall detach the sender and receiver links and disconnect the AMQP connection.]*/
            let finalError = err;
            async.series([
              (callback) => {
                if (this._senderLink) {
                  const tmpLink = this._senderLink;
                  this._senderLink = null;

                  if (finalError) {
                    tmpLink.removeListener('error', amqpErrorListener);
                    tmpLink.forceDetach(finalError);
                    callback();
                  } else {
                    tmpLink.detach((err) => {
                      finalError = finalError || err;
                      tmpLink.removeListener('error', amqpErrorListener);
                      callback();
                    });
                  }
                } else {
                  callback();
                }
              },
              (callback) => {
                if (this._receiverLink) {
                  const tmpLink = this._receiverLink;
                  this._receiverLink = null;

                  if (finalError) {
                    tmpLink.removeListener('error', amqpErrorListener);
                    tmpLink.removeListener('message', responseHandler);
                    tmpLink.forceDetach(finalError);
                    callback();
                  } else {
                    tmpLink.detach((err) => {
                      finalError = finalError || err;
                      tmpLink.removeListener('error', amqpErrorListener);
                      tmpLink.removeListener('message', responseHandler);
                      callback();
                    });
                  }
                } else {
                  callback();
                }
              },
              (callback) => {
                this._amqpBase.disconnect((err) => {
                  finalError = finalError || err;
                  callback();
                });
              }
            ], () => {
              /*Codes_SRS_NODE_PROVISIONING_AMQP_16_024: [`disconnect` shall call its callback with no arguments if all detach/disconnect operations were successful.]*/
              /*Codes_SRS_NODE_PROVISIONING_AMQP_16_025: [`disconnect` shall call its callback with the error passed from the first unsuccessful detach/disconnect operation if one of those fail.]*/
              this._amqpStateMachine.transition('disconnected', finalError, registrationResult, callback);
            });
          },
          '*': () => this._amqpStateMachine.deferUntilTransition()
        }
      }
    });

    this._amqpStateMachine.on('transition', (data) => debug('AMQP State Machine: ' + data.fromState + ' -> ' + data.toState + ' (' + data.action + ')'));
    this._amqpStateMachine.on('handling', (data) => debug('AMQP State Machine: handling ' + data.inputType));
  }