constructor()

in common/transport/amqp/src/amqp.ts [127:692]


  constructor(autoSettleMessages: boolean) {
    // node-amqp10 has an automatic reconnection/link re-attach feature that is enabled by default.
    // In our case we want to control the reconnection flow ourselves, so we need to disable it.

    /*Codes_SRS_NODE_COMMON_AMQP_16_042: [The Amqp constructor shall create a new `rhea.Client` instance and configure it to:
    - not reconnect on failure
    - not reattach sender and receiver links on failure]*/

    this._rheaContainer = rheaCreateContainer();

    this._rheaContainer.on('azure-iot-amqp-base:error-indicated', (err: AmqpError) => {
      debugErrors('azure-iot-amqp-base:error-indicated invoked ' + getErrorName(err));
      this._fsm.handle('amqpError', err);

    });

    const rheaErrorHandler = (context: EventContext) => {
      debugErrors(`rhea error event handler: error = ${context.error}`);
      this._fsm.handle('error', context);
    };

    const connectionErrorHandler = (context: EventContext) => {
      debugErrors(`connection error event handler: error = ${context.error}`);
      this._fsm.handle('connection_error', context);
    };

    const connectionCloseHandler = (context: EventContext) => {
      debug('connection close event handler');
      this._fsm.handle('connection_close', context);
    };

    const connectionOpenHandler = (context: EventContext) => {
      debug('connection open event handler');
      this._fsm.handle('connection_open', context);
    };

    const connectionDisconnectedHandler = (context: EventContext) => {
      if (context.error) {
        debugErrors(`connection disconnected event handler: error = ${context.error}`);
      } else {
        debug('connection disconnected event handler');
      }
      this._fsm.handle('disconnected', context);
    };
    const manageConnectionHandlers = (operation: string) => {
      this._rheaConnection[operation]('connection_error', connectionErrorHandler);
      this._rheaConnection[operation]('connection_open', connectionOpenHandler);
      this._rheaConnection[operation]('connection_close', connectionCloseHandler);
      this._rheaConnection[operation]('disconnected', connectionDisconnectedHandler);
      this._rheaConnection[operation]('error', rheaErrorHandler);
    };

    const sessionErrorHandler = (context: EventContext) => {
      debugErrors(`session error event handler: error = ${getErrorName(context.error)}`);
      this._fsm.handle('session_error', context);
    };
    const sessionOpenHandler = (context: EventContext) => {
      debug('session open event handler');
      this._fsm.handle('session_open', context);
    };
    const sessionCloseHandler = (context: EventContext) => {
      debug('session close event handler');
      this._fsm.handle('session_close', context);
    };
    const manageSessionHandlers = (operation: string) => {
      this._rheaSession[operation]('session_error', sessionErrorHandler);
      this._rheaSession[operation]('session_open', sessionOpenHandler);
      this._rheaSession[operation]('session_close', sessionCloseHandler);
    };

    this._fsm = new machina.Fsm({
      namespace: 'amqp-base',
      initialState: 'disconnected',
      states: {
        disconnected: {
          _onEnter: (disconnectCallback, err, _result) => {
            if (disconnectCallback) {
              if (err) {
                this._safeCallback(disconnectCallback, err);
              } else {
                this._safeCallback(disconnectCallback, null, new results.Disconnected());
              }
            } else if (this._disconnectHandler) {
              debug('calling upper layer disconnect handler');
              debugErrors('error passed to disconnect handler is: ' + getErrorName(err || new errors.NotConnectedError('rhea: connection disconnected')));
              this._disconnectHandler(err || new errors.NotConnectedError('rhea: connection disconnected'));
            }
          },
          amqpError: (err) => {
            debugErrors('received an error while disconnected: maybe a bug: ' + (err ? err.name : 'falsy error object.'));
          },
          connect: (connectionParameters, connectCallback) => {
            this._fsm.transition('connecting', connectionParameters, connectCallback);
          },
          disconnect: (callback) => callback(null, new results.Disconnected()),
          attachSenderLink: (_endpoint, _linkOptions, callback) => callback(new errors.NotConnectedError()),
          attachReceiverLink: (_endpoint, _linkOptions, callback) => callback(new errors.NotConnectedError()),
          detachSenderLink: (_endpoint, callback) => this._safeCallback(callback),
          detachReceiverLink: (_endpoint, callback) => this._safeCallback(callback),
          initializeCBS: (callback) => callback(new errors.NotConnectedError()),
          putToken: (_audience, _token, callback) => callback(new errors.NotConnectedError()),
        },
        connecting: {
          _onEnter: (connectionParameters, connectCallback) => {
            this._rheaContainer.options.sender_options = {
              properties: {
                'com.microsoft:client-version': this._config.userAgentString
              },
              reconnect: false
            };
            this._rheaContainer.options.receiver_options = {
              properties: {
                'com.microsoft:client-version': this._config.userAgentString
              },
              reconnect: false,
              autoaccept: autoSettleMessages
            };
            this._connectionCallback = connectCallback;
            this._indicatedConnectionError = undefined;
            this._disconnectionOccurred = false;
            this._sessionCloseOccurred = false;
            this._connectionCloseOccurred = false;
            //
            // According to the rhea maintainers, one can depend on that fact that no actual network activity
            // will occur until the nextTick() after the call to connect.  Because of that, one can
            // put the event handlers on the rhea connection object returned from the connect call and be assured
            // that the listeners are in place BEFORE any possible events will be emitted on the connection.
            //
            this._rheaConnection = this._rheaContainer.connect(connectionParameters);
            manageConnectionHandlers('on');
          },
          connection_open: (context: EventContext) => {
            this._rheaConnection = context.connection;
            const callback = this._connectionCallback;
            this._connectionCallback = undefined;
            this._fsm.transition('connecting_session', callback);
          },
          connection_close: (_context: EventContext) => {
            const err = this._indicatedConnectionError;
            const callback = this._connectionCallback;
            this._indicatedConnectionError = undefined;
            this._connectionCallback = undefined;
            this._connectionCloseOccurred = true;
            manageConnectionHandlers('removeListener');
            this._fsm.transition('disconnected', callback, err);
          },
          connection_error: (context: EventContext) => {
            this._indicatedConnectionError = context.connection.error as AmqpError;
          },
          error: (context: EventContext) => {
            const callback = this._connectionCallback;
            this._connectionCallback = undefined;
            manageConnectionHandlers('removeListener');
            this._fsm.transition('disconnected', callback, context.connection.error);
          },
          disconnected: (_context: EventContext) => {
            const callback = this._connectionCallback;
            this._connectionCallback = undefined;
            manageConnectionHandlers('removeListener');
            this._fsm.transition('disconnected', callback, new errors.NotConnectedError('rhea: connection disconnected'));
          },
          '*': () => this._fsm.deferUntilTransition()
        },
        connecting_session: {
          _onEnter: (connectCallback, result) => {
            this._sessionCallback = connectCallback;
            this._sessionResult = result;
            //
            // According to the rhea maintainers, one can depend on that fact that no actual network activity
            // will occur until the nextTick() after the call to create_session.  Because of that, one can
            // put the event handlers on the rhea session object returned from the create_session call and be assured
            // that the listeners are in place BEFORE any possible events will be emitted on the session.
            //
            this._rheaSession = this._rheaConnection.create_session();
            manageSessionHandlers('on');
            this._rheaSession.open();
          },
          session_open: (_context: EventContext) => {
            const callback = this._sessionCallback;
            const result = this._sessionResult;
            this._sessionCallback = undefined;
            this._sessionResult = undefined;
            this._fsm.transition('connected', callback, result);
          },
          session_error: (context: EventContext) => {
            this._indicatedSessionError = context.session.error;
          },
          session_close: (_context: EventContext) => {
            const err = this._indicatedSessionError;
            const callback = this._sessionCallback;
            this._indicatedSessionError = undefined;
            this._sessionCallback = undefined;
            this._sessionCloseOccurred = true;
            this._fsm.transition('disconnecting', callback, err);
          },
          connection_error: (context: EventContext) => {
            this._indicatedConnectionError = context.connection.error;
          },
          connection_close: (_context: EventContext) => {
            const err = this._indicatedConnectionError;
            const callback = this._sessionCallback;
            //
            // We lie about session close coming in.  Thing is that if we are here we don't actually have
            // a good session set up.  This way we won't wait around for a session end that probably won't come.
            //
            this._sessionCloseOccurred = true;
            this._indicatedConnectionError = undefined;
            this._sessionCallback = undefined;
            this._connectionCloseOccurred = true;
            this._fsm.transition('disconnecting', callback, err);
          },
          error: (context: EventContext) => {
            const callback = this._sessionCallback;
            //
            // We lie about session close coming in.  Thing is that if we are here we don't actually have
            // a good session set up.  This way we won't wait around for a session end that probably won't come.
            //
            this._sessionCloseOccurred = true;
            this._sessionCallback = undefined;
            this._fsm.transition('disconnecting', callback, context.connection.error);
          },
          disconnected: (_context: EventContext) => {
            const callback = this._sessionCallback;
            this._sessionCallback = undefined;
            manageConnectionHandlers('removeListener');
            this._fsm.transition('disconnected', callback, new errors.NotConnectedError('rhea: connection disconnected'));
          },
          amqpError: (err) => {
            this._fsm.transition('disconnecting', null, err);
          },
          '*': () => this._fsm.deferUntilTransition()
        },
        connected: {
          _onEnter: (connectCallback, result) => {
            /*Codes_SRS_NODE_COMMON_AMQP_16_002: [The `connect` method shall establish a connection with the IoT hub instance and if given as argument call the `done` callback with a null error object in the case of success and a `results.Connected` object.]*/
            this._safeCallback(connectCallback, null, new results.Connected(result));
          },
          session_error: (context: EventContext) => {
            this._indicatedSessionError = context.session.error;
          },
          session_close: (_context: EventContext) => {
            const err = this._indicatedSessionError;
            this._indicatedSessionError = undefined;
            this._sessionCloseOccurred = true;
            this._fsm.transition('disconnecting', null, err);
          },
          connection_error: (context: EventContext) => {
            this._indicatedConnectionError = context.connection.error;
          },
          connection_close: (_context: EventContext) => {
            const err = this._indicatedConnectionError;
            this._indicatedConnectionError = undefined;
            this._connectionCloseOccurred = true;
            this._fsm.transition('disconnecting', null, err);
          },
          error: (context: EventContext) => {
            if (context && context.connection) {
              this._fsm.transition('disconnecting', null, context.connection.error);
            } else {
              this._fsm.transition('disconnecting', null, context);
            }
          },
          disconnected: (_context: EventContext) => {
            this._disconnectionOccurred = true;
            this._fsm.transition('disconnecting', null, new errors.NotConnectedError('rhea: connection disconnected'));
          },
          amqpError: (err) => {
            this._fsm.transition('disconnecting', null, err);
          },
          connect: (_policyOverride, callback) => callback(null, new results.Connected()),
          disconnect: (disconnectCallback) => {
            this._fsm.transition('disconnecting', disconnectCallback);
          },
          initializeCBS: (callback) => {
            this._cbs = new ClaimsBasedSecurityAgent(this._rheaSession);
            this._cbs.attach(callback);
          },
          putToken: (audience, token, callback) => {
            if (!this._cbs) {
              this._fsm.handle('initializeCBS', (err) => {
                if (err) {
                  callback(err);
                } else {
                  this._fsm.handle('putToken', audience, token, callback);
                }
              });
            } else {
              this._cbs.putToken(audience, token, callback);
            }
          },
          send: (message: Message, endpoint: string, to: string, done: GenericAmqpBaseCallback<any>): void => {
            /*Codes_SRS_NODE_COMMON_AMQP_16_006: [The `send` method shall construct an AMQP message using information supplied by the caller, as follows:
            The ‘to’ field of the message should be set to the ‘to’ argument.
            The ‘body’ of the message should be built using the message argument.] */
            debug('call to deprecated api \'azure-iot-amqp-base.Amqp.send\'. You should be using SenderLink.send instead');
            const amqpMessage = AmqpMessage.fromMessage(message);
            if (to !== undefined) {
              amqpMessage.to = to;
            }

            if (!this._senders[endpoint]) {
              this._fsm.handle('attachSenderLink', endpoint, null, (err) => {
                if (err) {
                  debugErrors('failed to attach the sender link: ' + getErrorName(err));
                  done(err);
                } else {
                  (this._senders[endpoint] as SenderLink).send(amqpMessage, done);
                }
              });
            } else {
              (this._senders[endpoint] as SenderLink).send(amqpMessage, done);
            }

          },
          getReceiver: (endpoint: string, done: GenericAmqpBaseCallback<ReceiverLink>): void => {
            /*Codes_SRS_NODE_COMMON_AMQP_16_010: [If a receiver for this endpoint doesn’t exist, the getReceiver method should create a new AmqpReceiver object and then call the done() method with the object that was just created as an argument.] */
            if (!this._receivers[endpoint]) {
              this._fsm.handle('attachReceiverLink', endpoint, null, (err) => {
                if (err) {
                  done(err);
                } else {
                  done(null, this._receivers[endpoint]);
                }
              });
            } else {
              /*Codes_SRS_NODE_COMMON_AMQP_16_009: [If a receiver for this endpoint has already been created, the getReceiver method should call the done() method with the existing instance as an argument.] */
              done(null, this._receivers[endpoint]);
            }
          },
          attachReceiverLink: (endpoint: string, linkOptions: any, done: GenericAmqpBaseCallback<ReceiverLink>): void => {
            debug('creating receiver link for: ' + endpoint);
            this._receivers[endpoint] = new ReceiverLink(endpoint, linkOptions, this._rheaSession);
            const permanentErrorHandler = (_err) => {
              delete(this._receivers[endpoint]);
            };

            const operationErrorHandler = (err) => {
              done(err);
            };

            this._receivers[endpoint].on('error', permanentErrorHandler);
            this._receivers[endpoint].on('error', operationErrorHandler);
            /*Codes_SRS_NODE_COMMON_AMQP_06_006: [The `attachReceiverLink` method shall call `attach` on the `ReceiverLink` object.] */
            this._receivers[endpoint].attach((err) => {
              if (err) {
                permanentErrorHandler(err);
                operationErrorHandler(err);
              } else {
                this._receivers[endpoint].removeListener('error', operationErrorHandler);
                debug('receiver link attached: ' + endpoint);
                done(null, this._receivers[endpoint]);
              }
            });
          },
          attachSenderLink: (endpoint: string, linkOptions: any, done: GenericAmqpBaseCallback<any>): void => {
            debug('creating sender link for: ' + endpoint);
            this._senders[endpoint] = new SenderLink(endpoint, linkOptions, this._rheaSession);
            const permanentErrorHandler = (_err) => {
              delete(this._senders[endpoint]);
            };

            const operationErrorHandler = (err) => {
              done(err);
            };

            this._senders[endpoint].on('error', permanentErrorHandler);
            this._senders[endpoint].on('error', operationErrorHandler);
            /*Codes_SRS_NODE_COMMON_AMQP_06_005: [The `attachSenderLink` method shall call `attach` on the `SenderLink` object.] */
            this._senders[endpoint].attach((err) => {
              if (err) {
                permanentErrorHandler(err);
                operationErrorHandler(err);
              } else {
                this._senders[endpoint].removeListener('error', operationErrorHandler);
                done(null, this._senders[endpoint]);
              }
            });
          },
          detachReceiverLink: (endpoint: string, detachCallback: GenericAmqpBaseCallback<any>): void => {
            if (!this._receivers[endpoint]) {
              this._safeCallback(detachCallback);
            } else {
              this._detachLink(this._receivers[endpoint], (err?) => {
                delete(this._receivers[endpoint]);
                this._safeCallback(detachCallback, err);
              });
            }
          },
          detachSenderLink: (endpoint: string, detachCallback: GenericAmqpBaseCallback<any>): void => {
            this._detachLink(this._senders[endpoint], (err?) => {
              delete(this._senders[endpoint]);
              this._safeCallback(detachCallback, err);
            });
          }
        },
        disconnecting: {
          _onEnter: (disconnectCallback, err) => {
            debug('Entering disconnecting state with disconnectCallback: ' + disconnectCallback + ' error of: ' + getErrorName(err));
            const sessionEnd = (callback) => {
              //
              // If a disconnection has already happened then there is no point in trying to send a session close.
              // Just be done.
              //
              if (this._disconnectionOccurred) {
                callback();
              } else {
                //
                // A session close may have already been received from the peer.  If we are disconnecting because of a session error as an example.
                // We should send a session close from our end BUT, we should not expect to receive back another session close in response.
                //
                this._rheaSession.close();
                if (this._sessionCloseOccurred) {
                  callback();
                } else {
                  this._sessionCallback = callback; // So that the session_close handler for this disconnecting state can invoke the callback.
                }
              }
            };

            const disconnect = (callback) => {
              debug('entering disconnect function of disconnecting state');
              if (err) {
                debugErrors('with a disconnecting state err: ' + getErrorName(err));
              }
              //
              // If a disconnection has already occurred there is no point in generating any network traffic.
              //
              if (this._disconnectionOccurred) {
                debug('in disconnecting state - a disconnect had already been detected.  No point in doing anything.');
                callback(err);
              } else {
                //
                // A connection close may have already been received from the peer.  If we are disconnecting because of a connection error as an example.
                // We should send a connection close from our end BUT, we should not expect to receive back another connection close in response.
                //
                debug('disconnect in disconnecting state is about send a close to the peer.');
                this._rheaConnection.close();
                if (this._connectionCloseOccurred) {
                  callback(err);
                } else {
                  this._connectionCallback = callback;
                }
              }
            };

            const detachLink = (link, callback) => {
              if (!link) {
                return callback();
              }

              if (this._sessionCloseOccurred || this._disconnectionOccurred) {
                debugErrors('forceDetaching link: ' + link);
                link.forceDetach(err);
                callback();
              } else {
                debug('cleanly detaching link: ' + link);
                link.detach(callback, err);
              }
            };

            const remainingLinks = [];
            for (const senderEndpoint in this._senders) {
              if (Object.prototype.hasOwnProperty.call(this._senders, senderEndpoint)) {
                remainingLinks.push(this._senders[senderEndpoint]);
                delete this._senders[senderEndpoint];
              }
            }

            for (const receiverEndpoint in this._receivers) {
              if (Object.prototype.hasOwnProperty.call(this._receivers, receiverEndpoint)) {
                remainingLinks.push(this._receivers[receiverEndpoint]);
                  delete this._receivers[receiverEndpoint];
              }
            }

            //
            // Depending on the mode of failure it is possible that no network activity is occurring.
            // However, the SDK *might* not have noticed that yet.
            //
            // If this happens then rundown code will simply stall waiting for a response which will never
            // occur.
            //
            // To deal with this, we encapsulate the rundown code in a function and we invoke it.
            //
            // First, however, we will spin off a timer to execute the very same rundown code in 45 seconds, but that
            // function invocation will use API that do NOT expect a reply.
            //
            // If the initial function invocation actually doesn't stall due to lack of communication, it's
            // last action will be to clear the timeout and consequently the timeout invocation call to the
            // rundown code will NOT occur.
            //
            let rerunWithForceTimer: any = undefined;
            const rundownConnection = (force: boolean) => {
              //
              // Note that _disconnectionOccurred could already be true on the first run (the NON setTimer invocation).
              // Thus, we don't simply want to set it to the value of force.
              //
              if (force) {
                this._disconnectionOccurred = true;
              }
              detachLink(this._cbs, () => {
                /*Codes_SRS_NODE_COMMON_AMQP_16_034: [The `disconnect` method shall detach all open links before disconnecting the underlying AMQP client.]*/
                async.each(remainingLinks, detachLink, () => {
                  sessionEnd((sessionError) => {
                    disconnect((disconnectError) => {
                      clearTimeout(rerunWithForceTimer);
                      manageConnectionHandlers('removeListener');
                      const finalError = err || sessionError || disconnectError;
                      this._fsm.transition('disconnected', disconnectCallback, finalError);
                    });
                  });
                });
              });
            };

            /*Codes_SRS_NODE_COMMON_AMQP_06_007: [While disconnecting, if the run down does not complete within 45 seconds, the code will be re-run with `forceDetach`es.]*/
            rerunWithForceTimer = setTimeout(() => {
              debugErrors('the normal rundown expired without completion.  Do it again with force detaches.');
              rundownConnection(true);
            }, 45000);
            rundownConnection(false);
          },
          session_close: (_context: EventContext) => {
            const err = this._indicatedSessionError;
            this._indicatedSessionError = undefined;
            const callback = this._sessionCallback;
            this._sessionCallback = undefined;
            this._sessionCloseOccurred = true;
            if (callback) {
              callback(err);
            }
          },
          session_error: (context: EventContext) => {
            this._indicatedSessionError = context.session.error;
          },
          connection_error: (context: EventContext) => {
            this._indicatedConnectionError = context.connection.error;
          },
          connection_close: (_context: EventContext) => {
            const err = this._indicatedConnectionError;
            const callback = this._connectionCallback;
            this._indicatedConnectionError = undefined;
            this._connectionCloseOccurred = true;
            /*Codes_SRS_NODE_COMMON_AMQP_16_004: [The disconnect method shall call the done callback when the application/service has been successfully disconnected from the service] */
            if (callback) {
              callback(err);
            }
          },
          error: (_context: EventContext) => {
            debugErrors('ignoring error events while disconnecting');
          },
          disconnected: (_context: EventContext) => {
            this._disconnectionOccurred = true;
          },
          amqpError: (err) => {
            debugErrors('ignoring error event while disconnecting: ' + getErrorName(err));
          },
          '*': () => this._fsm.deferUntilTransition('disconnected')
        }
      }
    });

    this._fsm.on('transition', (transition) => {
      debug(transition.fromState + ' -> ' + transition.toState + ' (action:' + transition.action + ')');
    });
  }