constructor()

in device/transport/amqp/src/amqp.ts [77:610]


  constructor(authenticationProvider: AuthenticationProvider, baseClient?: BaseAmqpClient) {
    super();
    this._authenticationProvider = authenticationProvider;

    /*Codes_SRS_NODE_DEVICE_AMQP_16_056: [If the `authenticationProvider` object passed to the `Amqp` constructor has a `type` property which value is set to `AuthenticationType.Token` the `Amqp` constructor shall subscribe to the `newTokenAvailable` event of the `authenticationProvider` object.]*/
    if (this._authenticationProvider.type === AuthenticationType.Token) {
      (<any>this._authenticationProvider).on('newTokenAvailable', (newCredentials) => {
        /*Codes_SRS_NODE_DEVICE_AMQP_16_057: [If a `newTokenAvailable` event is emitted by the `authenticationProvider` object passed as an argument to the constructor, a `putToken` operation shall be initiated with the new shared access signature if the amqp connection is already connected.]*/
        this._fsm.handle('updateSharedAccessSignature', newCredentials.sharedAccessSignature, (err) => {
          if (err) {
            /*Codes_SRS_NODE_DEVICE_AMQP_16_058: [If the `putToken` operation initiated upon receiving a `newTokenAvailable` event fails, a `disconnect` event shall be emitted with the error from the failed `putToken` operation.]*/
            debugErrors('Error updating the shared access signature: ' + getErrorName(err));
            this._fsm.handle('disconnect', () => {
              debug('emitting the disconnect event in response to a update signature failure');
              this.emit('disconnect', err);
            });
          }
        });
      });
    }

    this._amqp = baseClient || new BaseAmqpClient(false);
    this._amqp.setDisconnectHandler((err) => {
      if (err) {
        debugErrors('disconnected event handler: ' + getErrorName(err));
      } else {
        debug('disconnected event handler: no error');
      }
      this._fsm.handle('amqpConnectionClosed', err, () => {
        this.emit('disconnect', getTranslatedError(err, 'AMQP client disconnected'));
      });
    });

    this._deviceMethodClient = new AmqpDeviceMethodClient(this._authenticationProvider, this._amqp);
    /*Codes_SRS_NODE_DEVICE_AMQP_16_041: [Any `error` event received on any of the links used for device methods shall trigger the emission of an `error` event by the transport, with an argument that is a `MethodsDetachedError` object with the `innerError` property set to that error.]*/
    this._deviceMethodClient.on('error', (err) => {
      const methodsError = new errors.DeviceMethodsDetachedError('Device Methods AMQP links failed');
      methodsError.innerError = err;
      this.emit('error', methodsError);
    });

    this._twinClient = new AmqpTwinClient(this._authenticationProvider, this._amqp);
    /*Codes_SRS_NODE_DEVICE_AMQP_16_048: [Any `error` event received on any of the links used for twin shall trigger the emission of an `error` event by the transport, with an argument that is a `TwinDetachedError` object with the `innerError` property set to that error.]*/
    this._twinClient.on('error', (err) => {
      const twinError = new errors.TwinDetachedError('Twin AMQP links failed');
      twinError.innerError = err;
      this.emit('error', twinError);
    });

    this._twinClient.on('twinDesiredPropertiesUpdate', (patch) => this.emit('twinDesiredPropertiesUpdate', patch));

    /*Codes_SRS_NODE_DEVICE_AMQP_16_034: [Any `error` event received on the C2D link shall trigger the emission of an `error` event by the transport, with an argument that is a `C2DDetachedError` object with the `innerError` property set to that error.]*/
    this._c2dErrorListener = (err) => {
      debugErrors('Error on the C2D link: ' + getErrorName(err));
      const c2dError = new errors.CloudToDeviceDetachedError('Cloud-to-device AMQP link failed');
      c2dError.innerError = err;
      this.emit('error', c2dError);
    };

    this._c2dMessageListener = (msg: AmqpMessage) => {
      let inputName: string;
      if (msg.message_annotations) {
        inputName = msg.message_annotations['x-opt-input-name'];
      }
      if (this._messageEventName === 'inputMessage') {
        /*Codes_SRS_NODE_DEVICE_AMQP_18_014: [If `amqp` receives a message on the input message link, it shall emit an "inputMessage" event with the value of the annotation property "x-opt-input-name" as the first parameter and the agnostic message as the second parameter.]*/
        debug('inputMessage received on C2D link, emitting \'inputMessage\'');
        this.emit('inputMessage', inputName, AmqpMessage.toMessage(msg));
      } else {
        /*Codes_SRS_NODE_DEVICE_AMQP_18_013: [If `amqp` receives a message on the C2D link, it shall emit a "message" event with the message as the event parameter.]*/
        debug('message received on C2D link, emitting \'message\'');
        this.emit('message', AmqpMessage.toMessage(msg));
      }
    };

    this._d2cErrorListener = (err) => {
      debugErrors('Error on the D2C link: ' + getErrorName(err));
      this._d2cLink = null;
      // we don't really care because we can reattach the link every time we send and surface the error at that time.
    };

    this._amqpLinkEmitter = new EventEmitter();
    this._amqpLinkEmitter.setMaxListeners(Infinity);

    this._fsm = new machina.Fsm({
      initialState: 'disconnected',
      states: {
        disconnected: {
          _onEnter: (err, callback) => {
            /*Codes_SRS_NODE_DEVICE_AMQP_16_083: [When the `amqp` client is disconnected and if token-based authentication is used the `stop` method of the `AuthenticationProvider` shall be called.]*/
            if (this._authenticationProvider.type === AuthenticationType.Token) {
              (this._authenticationProvider as SharedAccessKeyAuthenticationProvider).stop();
            }

            if (callback) {
              if (err) {
                callback(err);
              } else {
                callback(null, new results.Disconnected());
              }
            } else if (err) {
              this.emit('error', err);
            }
          },
          connect: (connectCallback) => this._fsm.transition('connecting', connectCallback),
          disconnect: (disconnectCallback) => {
            if (disconnectCallback) {
              disconnectCallback(null, new results.Disconnected());
            }
          },
          sendEvent: (amqpMessage, sendCallback) => {
            /*Codes_SRS_NODE_DEVICE_AMQP_16_024: [The `sendEvent` method shall connect and authenticate the transport if necessary.]*/
            /*Codes_SRS_NODE_DEVICE_AMQP_18_005: [The `sendOutputEvent` method shall connect and authenticate the transport if necessary.]*/
            this._fsm.handle('connect', (err, _result) => {
              if (err) {
                sendCallback(err);
              } else {
                this._fsm.handle('sendEvent', amqpMessage, sendCallback);
              }
            });
          },
          updateSharedAccessSignature: (_token, callback) => {
          // nothing to do here: the SAS has been updated in the config object.
          callback(null, new results.SharedAccessSignatureUpdated(false));
          },
          getTwin: (callback)  => {
            /*Codes_SRS_NODE_DEVICE_AMQP_16_059: [The `getTwin` method shall connect and authenticate the transport if it is disconnected.]*/
            this._fsm.handle('connect', (err, _result) => {
              if (err) {
                /*Codes_SRS_NODE_DEVICE_AMQP_16_060: [The `getTwin` method shall call its callback with an error if connecting fails.]*/
                /*Codes_SRS_NODE_DEVICE_AMQP_16_061: [The `getTwin` method shall call its callback with an error if authenticating fails.]*/
                callback(err);
              } else {
                this._fsm.handle('getTwin', callback);
              }
            });
          },
          updateTwinReportedProperties: (patch, callback)  => {
            /*Codes_SRS_NODE_DEVICE_AMQP_16_065: [The `updateTwinReportedProperties` method shall connect and authenticate the transport if it is disconnected.]*/
            this._fsm.handle('connect', (err, _result) => {
              if (err) {
                /*Codes_SRS_NODE_DEVICE_AMQP_16_066: [The `updateTwinReportedProperties` method shall call its callback with an error if connecting fails.]*/
                /*Codes_SRS_NODE_DEVICE_AMQP_16_067: [The `updateTwinReportedProperties` method shall call its callback with an error if authenticating fails.]*/
                callback(err);
              } else {
                this._fsm.handle('updateTwinReportedProperties', patch, callback);
              }
            });
          },
          enableTwinDesiredPropertiesUpdates:  (callback)  => {
           /*Codes_SRS_NODE_DEVICE_AMQP_16_071: [The `enableTwinDesiredPropertiesUpdates` method shall connect and authenticate the transport if it is disconnected.]*/
            this._fsm.handle('connect', (err, _result) => {
              if (err) {
                /*Codes_SRS_NODE_DEVICE_AMQP_16_072: [The `enableTwinDesiredPropertiesUpdates` method shall call its callback with an error if connecting fails.]*/
                /*Codes_SRS_NODE_DEVICE_AMQP_16_073: [The `enableTwinDesiredPropertiesUpdates` method shall call its callback with an error if authenticating fails.]*/
                callback(err);
              } else {
                this._fsm.handle('enableTwinDesiredPropertiesUpdates', callback);
              }
            });
          },
          disableTwinDesiredPropertiesUpdates: (callback) => callback(),
          /*Codes_SRS_NODE_DEVICE_AMQP_16_031: [The `enableC2D` method shall connect and authenticate the transport if it is disconnected.]*/
          enableC2D: (callback) => {
            this._fsm.handle('connect', (err, _result) => {
              if (err) {
                /*Codes_SRS_NODE_DEVICE_AMQP_16_033: [The `enableC2D` method shall call its `callback` with an `Error` if the transport fails to connect, authenticate or attach link.]*/
                callback(err);
              } else {
                this._fsm.handle('enableC2D', callback);
              }
            });
          },
          /*Codes_SRS_NODE_DEVICE_AMQP_16_037: [The `disableC2D` method shall call its `callback` immediately if the transport is already disconnected.]*/
          disableC2D: (callback) => {
            // if we are disconnected the C2D link is already detached.
            callback();
          },
          /*Codes_SRS_NODE_DEVICE_AMQP_16_038: [The `enableMethods` method shall connect and authenticate the transport if it is disconnected.]*/
          enableMethods: (callback) => {
            this._fsm.handle('connect', (err, _result) => {
              if (err) {
                /*Codes_SRS_NODE_DEVICE_AMQP_16_040: [The `enableMethods` method shall call its `callback` with an `Error` if the transport fails to connect, authenticate or attach method links.]*/
                callback(err);
              } else {
                this._fsm.handle('enableMethods', callback);
              }
            });
          },
          /*Codes_SRS_NODE_DEVICE_AMQP_16_044: [The `disableMethods` method shall call its `callback` immediately if the transport is already disconnected.]*/
          disableMethods: (callback) => {
            // if we are disconnected the C2D link is already detached.
            callback();
          },
          amqpConnectionClosed: (err, callback) => {
            /*Codes_SRS_NODE_DEVICE_AMQP_16_080: [if the handler specified in the `setDisconnectHandler` call is called while the `Amqp` object is disconnected, the call shall be ignored.]*/
            debugErrors('ignoring amqpConnectionClosed because already disconnected: ' + getErrorName(err));
            callback();
          }
        },
        connecting: {
          _onEnter: (connectCallback) => {
            /*Codes_SRS_NODE_DEVICE_AMQP_16_054: [The `connect` method shall get the current credentials by calling `getDeviceCredentials` on the `AuthenticationProvider` object passed to the constructor as an argument.]*/
            this._authenticationProvider.getDeviceCredentials((err, credentials) => {
              if (err) {
                /*Codes_SRS_NODE_DEVICE_AMQP_16_055: [The `connect` method shall call its callback with an error if the callback passed to the `getDeviceCredentials` method is called with an error.]*/
                this._fsm.transition('disconnected', translateError('AMQP Transport: Could not get credentials', err), connectCallback);
              } else {
                if (credentials.moduleId) {
                  this._c2dEndpoint = endpoint.moduleMessagePath(credentials.deviceId, credentials.moduleId);
                  this._d2cEndpoint = endpoint.moduleEventPath(credentials.deviceId, credentials.moduleId);
                  this._messageEventName = 'inputMessage';
                } else {
                  this._c2dEndpoint = endpoint.deviceMessagePath(credentials.deviceId);
                  this._d2cEndpoint = endpoint.deviceEventPath(credentials.deviceId);
                  this._messageEventName = 'message';
                }
                /*Tests_SRS_NODE_DEVICE_AMQP_41_001: [ The AMQP transport should use the productInfo string in the `options` object if present ]*/
                /*Tests_SRS_NODE_DEVICE_AMQP_41_002: [ The connect method shall set the productInfo on the options object when calling the underlying connection object's connect method if it was supplied. ]*/
                const customInfo = (this._options && this._options.productInfo) ? this._options.productInfo : '';
                getUserAgentString(customInfo, (userAgentString) => {
                  const config: AmqpBaseTransportConfig = {
                    uri: this._getConnectionUri(credentials),
                    sslOptions: credentials.x509,
                    userAgentString: userAgentString
                  };
                  /*Codes_SRS_NODE_DEVICE_AMQP_13_002: [ The connect method shall set the CA cert on the options object when calling the underlying connection object's connect method if it was supplied. ]*/
                  // if (this._options && this._options.ca) {
                  //   config.sslOptions = config.sslOptions || {};
                  //   config.sslOptions.ca = this._options.ca;
                  // }
                  if (this._options) {
                    config.sslOptions = config.sslOptions || {};
                    /*Codes_SRS_NODE_DEVICE_AMQP_13_002: [ The connect method shall set the CA cert on the options object when calling the underlying connection object's connect method if it was supplied. ]*/
                    if (this._options.ca) {
                      config.sslOptions.ca = this._options.ca;
                    }
                    /*Codes_SRS_NODE_DEVICE_AMQP_99_084: [ The connect method shall set the HTTPS agent on the options object when calling the underlying connection object's connect method if it was supplied. ]*/
                    if (this._options.amqp && this._options.amqp.webSocketAgent) {
                      config.sslOptions.agent = this._options.amqp.webSocketAgent;
                    }
                  }
                  this._amqp.connect(config, (err, connectResult) => {
                    if (err) {
                      this._fsm.transition('disconnected', translateError('AMQP Transport: Could not connect', err), connectCallback);
                    } else {
                      this._fsm.transition('authenticating', connectResult, connectCallback);
                    }
                  });
                });
              }
            });
          },
          disconnect: (disconnectCallback) => this._fsm.transition('disconnecting', null, disconnectCallback),
          updateSharedAccessSignature: (_token, callback) => {
            callback(null, new results.SharedAccessSignatureUpdated(false));
          },
          /*Codes_SRS_NODE_DEVICE_AMQP_16_081: [if the handler specified in the `setDisconnectHandler` call is called while the `Amqp` object is connecting or authenticating, the connection shall be stopped and an `disconnect` event shall be emitted with the error translated to a transport-agnostic error.]*/
          amqpConnectionClosed: (err, callback) => this._fsm.transition('disconnecting', err, callback),
          '*': () => this._fsm.deferUntilTransition()
        },
        authenticating: {
          _onEnter: (connectResult, connectCallback) => {
            if (this._authenticationProvider.type === AuthenticationType.X509) {
              /*Codes_SRS_NODE_DEVICE_AMQP_06_005: [If x509 authentication is NOT being utilized then `initializeCBS` shall be invoked.]*/
              this._fsm.transition('authenticated', connectResult, connectCallback);
            } else {
              this._amqp.initializeCBS((err) => {
                if (err) {
                  /*Codes_SRS_NODE_DEVICE_AMQP_06_008: [If `initializeCBS` is not successful then the client will be disconnected.]*/
                  this._fsm.transition('disconnecting', getTranslatedError(err, 'AMQP Transport: Could not initialize CBS'), connectCallback);
                } else {
                  this._authenticationProvider.getDeviceCredentials((err, credentials) => {
                    if (err) {
                      this._fsm.transition('disconnecting', getTranslatedError(err, 'AMQP Transport: Could not get credentials from AuthenticationProvider'), connectCallback);
                    } else {
                      /*Codes_SRS_NODE_DEVICE_AMQP_06_006: [If `initializeCBS` is successful, `putToken` shall be invoked If `initializeCBS` is successful, `putToken` shall be invoked with the first parameter `audience`, created from the `sr` of the shared access signature, the actual shared access signature, and a callback.]*/
                      this._amqp.putToken(SharedAccessSignature.parse(credentials.sharedAccessSignature, ['sr', 'sig', 'se']).sr, credentials.sharedAccessSignature, (err) => {
                        if (err) {
                          /*Codes_SRS_NODE_DEVICE_AMQP_06_009: [If `putToken` is not successful then the client will be disconnected.]*/
                          this._fsm.transition('disconnecting', getTranslatedError(err, 'AMQP Transport: Could not authorize with putToken'), connectCallback);
                        } else {
                          this._fsm.transition('authenticated', connectResult, connectCallback);
                        }
                      });
                    }
                  });
                }
              });
            }
          },
          disconnect: (disconnectCallback) => this._fsm.transition('disconnecting', null, disconnectCallback),
          /*Codes_SRS_NODE_DEVICE_AMQP_16_081: [if the handler specified in the `setDisconnectHandler` call is called while the `Amqp` object is connecting or authenticating, the connection shall be stopped and an `disconnect` event shall be emitted with the error translated to a transport-agnostic error.]*/
          amqpConnectionClosed: (err, callback) => this._fsm.transition('disconnecting', err, callback),
          '*': () => this._fsm.deferUntilTransition()
        },
        authenticated: {
          _onEnter: (connectResult, connectCallback) => {
            /*Codes_SRS_NODE_DEVICE_AMQP_41_005: [ Once the amqp client is authenticated it will emit a `connected` event ]*/
            this.emit('connected');
            connectCallback(null, connectResult);
          },
          connect: (connectCallback) => connectCallback(null, new results.Connected()),
          disconnect: (disconnectCallback) => this._fsm.transition('disconnecting', null, disconnectCallback),
          sendEvent: (amqpMessage, sendCallback) => {
            amqpMessage.to = this._d2cEndpoint;

            /*Codes_SRS_NODE_DEVICE_AMQP_16_025: [The `sendEvent` method shall create and attach the d2c link if necessary.]*/
            /*Codes_SRS_NODE_DEVICE_AMQP_18_006: [The `sendOutputEvent` method shall create and attach the d2c link if necessary.]*/
            if (this._d2cLink) {
              debug('using existing d2c link');
              this._d2cLink.send(amqpMessage, handleResult('AMQP Transport: Could not send', sendCallback));
            } else {
              debug('waiting for a D2C link');
              this._amqpLinkEmitter.once('senderLinkAttached', (err) => {
                if (err) {
                  handleResult('AMQP Transport: Could not attach sender link', sendCallback)(err);
                } else {
                  this._d2cLink.send(amqpMessage, handleResult('AMQP Transport: Could not send', sendCallback));
                }
              });
              // If we were the first listener for the senderLinkAttached event, we should create a sender link
              // If we are not the first listener, we know that attachSenderLink was already called and we shouldn't call it again.
              // Doing so would create unnecessary sender links.
              if (this._amqpLinkEmitter.listenerCount('senderLinkAttached') === 1) {
                debug('attaching D2C link');
                this._amqp.attachSenderLink(this._d2cEndpoint, null, (err, link) => {
                  if (err) {
                    debugErrors('error creating a D2C link: ' + getErrorName(err));
                  } else {
                    debug('got a new D2C link');
                    this._d2cLink = link;
                    this._d2cLink.on('error', this._d2cErrorListener);
                  }
                  this._amqpLinkEmitter.emit('senderLinkAttached', err);
                });
              }
            }
          },
          updateSharedAccessSignature: (sharedAccessSignature, updateSasCallback) => {
            /*Codes_SRS_NODE_DEVICE_AMQP_06_010: [If the AMQP connection is established, the `updateSharedAccessSignature` method shall call the amqp transport `putToken` method with the first parameter `audience`, created from the `sr` of the shared access signature, the actual shared access signature, and a callback.]*/
            this._amqp.putToken(SharedAccessSignature.parse(sharedAccessSignature, ['sr', 'sig', 'se']).sr, sharedAccessSignature, (err) => {
              if (err) {
                this._amqp.disconnect(() => {
                  updateSasCallback(getTranslatedError(err, 'AMQP Transport: Could not authorize with puttoken'));
                });
              } else {
                /*Codes_SRS_NODE_DEVICE_AMQP_06_011: [The `updateSharedAccessSignature` method shall call the `done` callback with a null error object and a SharedAccessSignatureUpdated object as a result, indicating the client does NOT need to reestablish the transport connection.]*/
                updateSasCallback(null, new results.SharedAccessSignatureUpdated(false));
              }
            });
          },
          /*Codes_SRS_NODE_DEVICE_AMQP_16_062: [The `getTwin` method shall call the `getTwin` method on the `AmqpTwinClient` instance created by the constructor.]*/
          /*Codes_SRS_NODE_DEVICE_AMQP_16_063: [The `getTwin` method shall call its callback with and error if the call to `AmqpTwinClient.getTwin` fails.]*/
          /*Codes_SRS_NODE_DEVICE_AMQP_16_064: [The `getTwin` method shall call its callback with a `null` error parameter and the result of the `AmqpTwinClient.getTwin` method if it succeeds.]*/
          getTwin: (callback) => this._twinClient.getTwin(handleResult('could not get twin', callback)),
          /*Codes_SRS_NODE_DEVICE_AMQP_16_068: [The `updateTwinReportedProperties` method shall call the `updateTwinReportedProperties` method on the `AmqpTwinClient` instance created by the constructor.]*/
          /*Codes_SRS_NODE_DEVICE_AMQP_16_069: [The `updateTwinReportedProperties` method shall call its callback with and error if the call to `AmqpTwinClient.updateTwinReportedProperties` fails.]*/
          /*Codes_SRS_NODE_DEVICE_AMQP_16_070: [The `updateTwinReportedProperties` method shall call its callback with a `null` error parameter and the result of the `AmqpTwinClient.updateTwinReportedProperties` method if it succeeds.]*/
          updateTwinReportedProperties: (patch, callback) => this._twinClient.updateTwinReportedProperties(patch, handleResult('could not update twin reported properties', callback)),
          /*Codes_SRS_NODE_DEVICE_AMQP_16_074: [The `enableTwinDesiredPropertiesUpdates` method shall call the `enableTwinDesiredPropertiesUpdates` method on the `AmqpTwinClient` instance created by the constructor.]*/
          /*Codes_SRS_NODE_DEVICE_AMQP_16_075: [The `enableTwinDesiredPropertiesUpdates` method shall call its callback with and error if the call to `AmqpTwinClient.enableTwinDesiredPropertiesUpdates` fails.]*/
          /*Codes_SRS_NODE_DEVICE_AMQP_16_076: [The `enableTwinDesiredPropertiesUpdates` method shall call its callback with no arguments if the call to `AmqpTwinClient.enableTwinDesiredPropertiesUpdates` succeeds.]*/
          enableTwinDesiredPropertiesUpdates: (callback) => this._twinClient.enableTwinDesiredPropertiesUpdates(handleResult('could not enable twin desired properties updates', callback)),
          /*Codes_SRS_NODE_DEVICE_AMQP_16_077: [The `disableTwinDesiredPropertiesUpdates` method shall call the `disableTwinDesiredPropertiesUpdates` method on the `AmqpTwinClient` instance created by the constructor.]*/
          /*Codes_SRS_NODE_DEVICE_AMQP_16_078: [The `disableTwinDesiredPropertiesUpdates` method shall call its callback with and error if the call to `AmqpTwinClient.disableTwinDesiredPropertiesUpdates` fails.]*/
          /*Codes_SRS_NODE_DEVICE_AMQP_16_079: [The `disableTwinDesiredPropertiesUpdates` method shall call its callback no arguments if the call to `AmqpTwinClient.disableTwinDesiredPropertiesUpdates` succeeds.]*/
          disableTwinDesiredPropertiesUpdates: (callback) => this._twinClient.disableTwinDesiredPropertiesUpdates(handleResult('could not disable twin desired properties updates', callback)),
          enableC2D: (callback) => {
            /*Codes_SRS_NODE_DEVICE_AMQP_41_003: [The `enableC2D` method shall attach the C2D link only if it is not already attached.] */
            if (this._c2dLink) {
              debug('C2D link already attached, doing nothing....');
              process.nextTick(callback);
            } else {
              debug('waiting for a C2D link');
              this._amqpLinkEmitter.once('receiverLinkAttached', (err) => {
                if (err) {
                  /*Codes_SRS_NODE_DEVICE_AMQP_16_033: [The `enableC2D` method shall call its `callback` with an `Error` if the transport fails to connect, authenticate or attach link.]*/
                  handleResult('AMQP Transport: Could not attach receiver link', callback)(err);
                } else {
                  callback();
                }
              });
              // If we were the first listener for the receiverLinkAttached event, we should create a receiver link
              // If we are not the first listener, we know that attachReceiverLink was already called and we shouldn't call it again.
              // Doing so would create unnecessary receiver links.
              if (this._amqpLinkEmitter.listenerCount('receiverLinkAttached') === 1) {
                debug('attaching C2D link');
                this._amqp.attachReceiverLink(this._c2dEndpoint, null, (err, receiverLink) => {
                  if (err) {
                    debugErrors('error creating a C2D link: ' + getErrorName(err));
                  } else {
                    /*Codes_SRS_NODE_DEVICE_AMQP_16_032: [The `enableC2D` method shall attach the C2D link and call its `callback` once it is successfully attached.]*/
                    debug('C2D link created and attached successfully');
                    this._c2dLink = receiverLink;
                    this._c2dLink.on('error', this._c2dErrorListener);
                    this._c2dLink.on('message', this._c2dMessageListener);
                  }
                  this._amqpLinkEmitter.emit('receiverLinkAttached', err);
                });
              }
            }
          },
          disableC2D: (callback) => {
            /*Codes_SRS_NODE_DEVICE_AMQP_41_004: [The `disableC2D` method shall detach the C2D link only if it is already attached.] */
            if (this._c2dLink) {
              /*Codes_SRS_NODE_DEVICE_AMQP_16_035: [The `disableC2D` method shall call `detach` on the C2D link and call its callback when it is successfully detached.]*/
              /*Codes_SRS_NODE_DEVICE_AMQP_16_036: [The `disableC2D` method shall call its `callback` with an `Error` if it fails to detach the C2D link.]*/
              this._stopC2DListener(undefined, callback);
            } else {
              debug('C2D link already detached, doing nothing...');
              callback();
            }
          },
          enableMethods: (callback) => {
            // deviceMethodClient already checks if the link is attached or not, so we do not need to check that here like we do with C2D
            /*Codes_SRS_NODE_DEVICE_AMQP_16_039: [The `enableMethods` method shall attach the method links and call its `callback` once these are successfully attached.]*/
            /*Codes_SRS_NODE_DEVICE_AMQP_16_040: [The `enableMethods` method shall call its `callback` with an `Error` if the transport fails to connect, authenticate or attach method links.]*/
            this._deviceMethodClient.attach(callback);
          },
          disableMethods: (callback) => {
            // deviceMethodClient already checks if the link is attached or not, so we do not need to check that here like we do with C2D
            /*Codes_SRS_NODE_DEVICE_AMQP_16_042: [The `disableMethods` method shall call `detach` on the device method links and call its callback when these are successfully detached.]*/
            /*Codes_SRS_NODE_DEVICE_AMQP_16_043: [The `disableMethods` method shall call its `callback` with an `Error` if it fails to detach the device method links.]*/
            this._deviceMethodClient.detach(callback);
          },
          /*Codes_SRS_NODE_DEVICE_AMQP_16_082: [if the handler specified in the `setDisconnectHandler` call is called while the `Amqp` object is connected, the connection shall be disconnected and an `disconnect` event shall be emitted with the error translated to a transport-agnostic error.]*/
          amqpConnectionClosed: (err, callback) => this._fsm.transition('disconnecting', err, callback)
        },
        disconnecting: {
          _onEnter: (err, disconnectCallback) => {
            let finalError = err;
            async.series([
              (callback) => {
                if (err) {
                  debugErrors('force-detaching device methods links because: ' + getErrorName(err));
                  this._deviceMethodClient.forceDetach();
                  callback();
                } else {
                  this._deviceMethodClient.detach((detachErr) => {
                    if (detachErr) {
                      debugErrors('error detaching methods links: ' + detachErr);
                      if (!finalError) {
                        finalError = translateError('error while detaching the methods links when disconnecting', detachErr);
                      }
                    } else {
                      debug('device methods links detached.');
                    }
                    callback();
                  });
                }
              },
              (callback) => {
                this._twinClient.detach((twinDetachError) => {
                  if (twinDetachError) {
                    debugErrors('error detaching twin links: ' + twinDetachError);
                    if (!finalError) {
                      finalError = translateError('error while detaching twin links', twinDetachError);
                    }
                  } else {
                    debug('device twin links detached');
                  }
                  callback();
                });
              },
              (callback) => {
                this._amqpLinkEmitter.removeAllListeners('senderLinkAttached');
                if (this._d2cLink) {
                  const tmpD2CLink = this._d2cLink;
                  this._d2cLink = undefined;

                  if (err) {
                  /*Codes_SRS_NODE_DEVICE_AMQP_16_023: [The `disconnect` method shall forcefully detach all attached links if a connection error is the causing the transport to be disconnected.]*/
                    tmpD2CLink.forceDetach(err);
                    tmpD2CLink.removeListener('error', this._d2cErrorListener);
                  }
                  tmpD2CLink.detach((detachErr) => {
                    if (detachErr) {
                      debugErrors('error detaching the D2C link: ' + detachErr);
                    } else {
                      debug('D2C link detached');
                    }
                    tmpD2CLink.removeListener('error', this._d2cErrorListener);
                    if (!finalError && detachErr) {
                      finalError = translateError('error while detaching the D2C link when disconnecting', detachErr);
                    }
                    callback();
                  });
                } else {
                  callback();
                }
              },
              (callback) => {
                this._amqpLinkEmitter.removeAllListeners('receiverLinkAttached');
                if (this._c2dLink) {
                  /*Codes_SRS_NODE_DEVICE_AMQP_16_022: [The `disconnect` method shall detach all attached links.]*/
                  this._stopC2DListener(err, (detachErr) => {
                    if (!finalError && detachErr) {
                      finalError = translateError('error while detaching the D2C link when disconnecting', detachErr);
                    }
                    callback();
                  });
                } else {
                  callback();
                }
              },
              (callback) => {
                this._amqp.disconnect((disconnectErr) => {
                  if (disconnectErr) {
                    debugErrors('error disconnecting the AMQP connection: ' + disconnectErr);
                  } else {
                    debug('amqp connection successfully disconnected.');
                  }
                  if (!finalError && disconnectErr) {
                    finalError = translateError('error while disconnecting the AMQP connection', disconnectErr);
                  }
                  callback();
                });
              }
            ], () => {
              /*Codes_SRS_NODE_DEVICE_AMQP_16_010: [The `done` callback method passed in argument shall be called when disconnected.]*/
              /*Codes_SRS_NODE_DEVICE_AMQP_16_011: [The `done` callback method passed in argument shall be called with an error object if disconnecting fails.]*/
              this._fsm.transition('disconnected', finalError, disconnectCallback);
            });
          },
          '*': (_connectCallback) => this._fsm.deferUntilTransition()
        },
      }
    });

    this._fsm.on('transition', (data) => {
      debug(data.fromState + ' -> ' + data.toState + ' (' + data.action + ')');
    });
  }