constructor()

in src/amqp.ts [84:450]


  constructor(config: Client.TransportConfigOptions, amqpBase?: Base) {
    super();
    this._amqp = amqpBase ? amqpBase : new Base(true);
    this._config = config;
    this._renewalTimeout = null;
    this._amqp.setDisconnectHandler((err) => {
      this._fsm.handle('amqpError', err);
    });

    this._c2dErrorListener = (err) => {
      debug('Error on the C2D link: ' + err.toString());
      this._c2dLink = null;
    };

    this._feedbackErrorListener = (err) => {
      debug('Error on the message feedback link: ' + err.toString());
      this._feedbackReceiver = null;
    };

    this._fileNotificationErrorListener = (err) => {
      debug('Error on the file notification link: ' + err.toString());
      this._fileNotificationReceiver = null;
    };

    this._fsm = new machina.Fsm({
      namespace: 'azure-iothub:Amqp',
      initialState: 'disconnected',
      states: {
        disconnected: {
          _onEnter: (err, callback) => {
            if (err) {
              if (callback) {
                callback(err);
              } else {
                this.emit('disconnect', err);
              }
            } else {
              if (callback) {
                callback();
              }
            }
          },
          connect: (callback) => {
            this._fsm.transition('connecting', callback);
          },
          disconnect: (callback) => callback(),
          send: (amqpMessage, deviceEndpoint, callback) => {
            /*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_025: [The `send` method shall connect and authenticate the transport if it is disconnected.]*/
            this._fsm.handle('connect', (err) => {
              if (err) {
                /*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_026: [The `send` method shall call its callback with an error if connecting and/or authenticating the transport fails.]*/
                callback(err);
              } else {
                this._fsm.handle('send', amqpMessage, deviceEndpoint, callback);
              }
            });
          },
          getFeedbackReceiver: (callback) => {
            /*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_033: [The `getFeedbackReceiver` method shall connect and authenticate the transport if it is disconnected.]*/
            this._fsm.handle('connect', (err) => {
              if (err) {
                /*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_034: [The `getFeedbackReceiver` method shall call its callback with an error if the transport fails to connect or authenticate.]*/
                callback(err);
              } else {
                this._fsm.handle('getFeedbackReceiver', callback);
              }
            });
          },
          getFileNotificationReceiver: (callback) => {
            /*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_036: [The `getFileNotificationReceiver` method shall connect and authenticate the transport if it is disconnected.]*/
            this._fsm.handle('connect', (err) => {
              if (err) {
                /*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_037: [The `getFileNotificationReceiver` method shall call its callback with an error if the transport fails to connect or authenticate.]*/
                callback(err);
              } else {
                this._fsm.handle('getFileNotificationReceiver', callback);
              }
            });
          },
          updateSharedAccessSignature: (_updatedSAS, callback) => {
            /*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_032: [The `updateSharedAccessSignature` shall not establish a connection if the transport is disconnected, but should use the new shared access signature on the next manually initiated connection attempt.]*/
            callback();
          },
          updateAccessToken: (_tokenValue, callback) => {
            callback();
          },
          amqpError: (err) => {
            debug('Late arriving error received while in disconnected state.');
            if (err) {
              debug(err.toString());
            }
          }
        },
        connecting: {
          _onEnter: (callback) => {
            const config: AmqpBaseTransportConfig = {
              uri: this._getConnectionUri(),
              userAgentString: packageJson.name + '/' + packageJson.version
            };
            debug('connecting');
            this._amqp.connect(config, (err, _result) => {
              if (err) {
                debug('failed to connect' + err.toString());
                this._fsm.transition('disconnected', err, callback);
              } else {
                debug('connected');
                this._fsm.transition('authenticating', callback);
              }
            });
          },
          disconnect: (callback) => {
            this._fsm.transition('disconnecting', null, callback);
          },
          amqpError: (err) => {
            this._fsm.transition('disconnecting', err);
          },
          '*': () => this._fsm.deferUntilTransition()
        },
        authenticating: {
          _onEnter: (callback) => {
            /*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_06_001: [`initializeCBS` shall be invoked.]*/
            this._amqp.initializeCBS((err) => {
              if (err) {
                debug('error trying to initialize CBS: ' + err.toString());
                /*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_06_002: [If `initializeCBS` is not successful then the client will remain disconnected and the callback, if provided, will be invoked with an error object.]*/
                this._fsm.transition('disconnecting', err, callback);
              } else {
                debug('CBS initialized');
                /*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_06_003: [If `initializeCBS` is successful, `putToken` shall be invoked with the first parameter audience, created from the sr of the sas signature, the next parameter of the actual sas, and a callback.]*/

                if (this._config.sharedAccessSignature) {
                  const audience = SharedAccessSignature.parse(this._config.sharedAccessSignature.toString(), ['sr', 'sig', 'se']).sr;
                  const isApplicationSuppliedSas = typeof (this._config.sharedAccessSignature) === 'string';

                  const sasToken = isApplicationSuppliedSas ? this._config.sharedAccessSignature as string : (this._config.sharedAccessSignature as SharedAccessSignature).extend(anHourFromNow());
                  this._amqp.putToken(audience, sasToken, (err) => {
                    if (err) {
                      /*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_06_004: [** If `putToken` is not successful then the client will remain disconnected and the callback, if provided, will be invoked with an error object.]*/
                      this._fsm.transition('disconnecting', err, callback);
                    } else {
                      this._fsm.transition('authenticated', isApplicationSuppliedSas, callback);
                    }
                  });
                } else if (this._config.tokenCredential) {
                  this.getToken().then((accessToken) => {
                    const tokenValue = this._bearerTokenPrefix + accessToken.token;
                    this._amqp.putToken(this._config.tokenScope, tokenValue, (err) => {
                      if (err) {
                        /*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_06_004: [** If `putToken` is not successful then the client will remain disconnected and the callback, if provided, will be invoked with an error object.]*/
                        this._fsm.transition('disconnecting', err, callback);
                      } else {
                        this._renewalNumberOfMilliseconds = Math.max(1000, (2 / 3) * (accessToken.expiresOnTimestamp - Date.now()));
                        this._fsm.transition('authenticated', false, callback);
                      }
                    });
                  }).catch((err) => {
                    this._fsm.transition('disconnecting', err, callback);
                  });
                }
              }
            });
          },
          disconnect: (callback) => {
            this._fsm.transition('disconnecting', null, callback);
          },
          amqpError: (err) => {
            this._fsm.transition('disconnecting', err);
          },
          '*': () => this._fsm.deferUntilTransition()
        },
        authenticated: {
          _onEnter: (isApplicationSuppliedSas, callback) => {
            if (!isApplicationSuppliedSas) {
              const renewalCallback = this._config.sharedAccessSignature ?
                this._handleSASRenewal :
                this._handleTokenCredentialRenewal;
              this._renewalTimeout = setTimeout(renewalCallback.bind(this), this._renewalNumberOfMilliseconds);
            }
            callback(null, new results.Connected());
          },
          _onExit: (_callback) => {
            if (this._renewalTimeout) {
              clearTimeout(this._renewalTimeout);
            }
          },
          connect: (callback) => callback(),
          disconnect: (callback) => this._fsm.transition('disconnecting', null, callback),
          send: (amqpMessage, deviceEndpoint, callback) => {
            /*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_003: [The message generated by the `send` method should have its “to” field set to the device ID passed as an argument.]*/
            amqpMessage.to = deviceEndpoint;
            if (!this._c2dLink) {
              debug('attaching new sender link: ' + this._c2dEndpoint);
              /*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_027: [The `send` method shall attach the C2D link if necessary.]*/
              this._amqp.attachSenderLink(this._c2dEndpoint, null, (err, link) => {
                if (err) {
                  debug('error trying to attach new sender link: ' + err.toString());
                  /*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_029: [The `send` method shall call its callback with an error if it fails to attach the C2D link.]*/
                  callback(err);
                } else {
                  debug('sender link attached. sending message.');
                  this._c2dLink = link;
                  this._c2dLink.on('error', this._c2dErrorListener);
                  /*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_030: [The `send` method shall call the `send` method of the C2D link and pass it the Amqp request that it created.]*/
                  this._c2dLink.send(amqpMessage, callback);
                }
              });
            } else {
              debug('reusing existing sender link: ' + this._c2dEndpoint + '. sending message.');
              /*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_028: [The `send` method shall reuse the C2D link if it is already attached.]*/
              this._c2dLink.send(amqpMessage, callback);
            }
          },
          getFeedbackReceiver: (callback) => {
            if (this._feedbackReceiver) {
              /*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_035: [The `getFeedbackReceiver` method shall reuse the existing feedback receiver it if has already been attached.]*/
              callback(null, this._feedbackReceiver);
            } else {
              /*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_013: [The `getFeedbackReceiver` method shall request an `AmqpReceiver` object from the base AMQP transport for the `/messages/serviceBound/feedback` endpoint.]*/
              this._amqp.attachReceiverLink(this._feedbackEndpoint, null, (err, link) => {
                if (err) {
                  callback(err);
                } else {
                  this._feedbackReceiver = new ServiceReceiver(link);
                  this._feedbackReceiver.on('error', this._feedbackErrorListener);
                  callback(null, this._feedbackReceiver);
                }
              });
            }
          },
          getFileNotificationReceiver: (callback) => {
            /*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_038: [The `getFileNotificationReceiver` method shall reuse the existing feedback receiver it if has already been attached.]*/
            if (this._fileNotificationReceiver) {
              callback(null, this._fileNotificationReceiver);
            } else {
              /*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_016: [The `getFileNotificationReceiver` method shall request an `AmqpReceiver` object from the base AMQP transport for the `/messages/serviceBound/filenotifications` endpoint.]*/
              this._amqp.attachReceiverLink(this._fileNotificationEndpoint, null, (err, link) => {
                if (err) {
                  callback(err);
                } else {
                  this._fileNotificationReceiver = new ServiceReceiver(link);
                  this._fileNotificationReceiver.on('error', this._fileNotificationErrorListener);
                  callback(null, this._fileNotificationReceiver);
                }
              });
            }
          },
          updateSharedAccessSignature: (updatedSAS, callback) => {
            /*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_031: [The `updateSharedAccessSignature` shall trigger a `putToken` call on the base transport if it is connected.]*/
            const audience = SharedAccessSignature.parse(this._config.sharedAccessSignature.toString(), ['sr', 'sig', 'se']).sr;
            this._amqp.putToken(audience, updatedSAS, callback);
          },
          updateAccessToken: (tokenValue, callback) => {
            this._amqp.putToken(this._config.tokenScope, tokenValue, callback);
          },
          amqpError: (err) => {
            this._fsm.transition('disconnecting', err);
          }
        },
        disconnecting: {
          _onEnter: (err, disconnectCallback) => {
            let finalError: Error = err;
            async.series([
              (callback) => {
                /*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_021: [The `disconnect` method shall detach the C2D messaging link if it is attached.]*/
                if (this._c2dLink) {
                  const tmpC2DLink = this._c2dLink;
                  this._c2dLink = undefined;
                  if (err) {
                    debug('force-detaching c2d links');
                    tmpC2DLink.forceDetach(err);
                    callback();
                  } else {
                    tmpC2DLink.detach((detachErr) => {
                      if (detachErr) {
                        debug('error detaching the c2d link: ' + detachErr.toString());
                        if (!finalError) {
                          finalError = translateError('error while detaching the c2d link when disconnecting', detachErr);
                        }
                      } else {
                        debug('c2d link detached.');
                      }
                      callback();
                    });
                  }
                } else {
                  callback();
                }
              },
              (callback) => {
                /*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_022: [The `disconnect` method shall detach the C2D feedback receiver link if it is attached.]*/
                if (this._feedbackReceiver) {
                  const tmpFeedbackReceiver = this._feedbackReceiver;
                  this._feedbackReceiver = undefined;

                  if (err) {
                    tmpFeedbackReceiver.forceDetach(err);
                    tmpFeedbackReceiver.removeListener('error', this._feedbackErrorListener);
                    callback();
                  } else {
                    tmpFeedbackReceiver.detach((detachErr) => {
                      if (detachErr) {
                        debug('error detaching the message feedback link: ' + detachErr.toString());
                      } else {
                        debug('feedback link detached');
                      }
                      tmpFeedbackReceiver.removeListener('error', this._feedbackErrorListener);
                      if (!finalError && detachErr) {
                        finalError = translateError('error while detaching the message feedback link when disconnecting', detachErr);
                      }
                      callback();
                    });
                  }
                } else {
                  callback();
                }
              },
              /*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_023: [The `disconnect` method shall detach the file notification receiver link if it is attached.]*/
              (callback) => {
                if (this._fileNotificationReceiver) {
                  const tmpFileNotificationReceiver = this._fileNotificationReceiver;
                  this._fileNotificationReceiver = undefined;

                  if (err) {
                    tmpFileNotificationReceiver.forceDetach(err);
                    tmpFileNotificationReceiver.removeListener('error', this._fileNotificationErrorListener);
                    callback();
                  } else {
                    tmpFileNotificationReceiver.detach((detachErr) => {
                      if (detachErr) {
                        debug('error detaching the file upload notification link: ' + detachErr.toString());
                      } else {
                        debug('File notification link detached');
                      }
                      tmpFileNotificationReceiver.removeListener('error', this._fileNotificationErrorListener);
                      if (!finalError && detachErr) {
                        finalError = translateError('error while detaching the file upload notification link when disconnecting', detachErr);
                      }
                      callback();
                    });
                  }
                } else {
                  callback();
                }
              },
              (callback) => {
                this._amqp.disconnect((disconnectErr) => {
                  if (disconnectErr) {
                    debug('error disconnecting the AMQP connection: ' + disconnectErr.toString());
                  } else {
                    debug('amqp connection successfully disconnected.');
                  }
                  if (!finalError && disconnectErr) {
                    finalError = translateError('error while disconnecting the AMQP connection', disconnectErr);
                  }
                  callback();
                });
              }
            ], () => {
              /*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_024: [Any error generated by detaching a link should be passed as the single argument of the callback of the `disconnect` method.]*/
              this._fsm.transition('disconnected', finalError, disconnectCallback);
            });
          },
          '*': () => this._fsm.deferUntilTransition()
        }
      }
    });
  }