constructor()

in src/common-amqp/receiver_link.ts [79:396]


  constructor(linkAddress: string, linkOptions: ReceiverOptions, session: Session) {
    super();
    this._linkAddress = linkAddress;
    this._rheaSession = session;
    this._undisposedDeliveries = [];
    this._combinedOptions = {
      source: linkAddress
    };

    if (linkOptions) {
      for (const k in linkOptions) {
        this._combinedOptions[k] = linkOptions[k];
      }
    }

    const receiverOpenHandler = (context: EventContext): void => {
      this._fsm.handle('receiverOpenEvent', context);
    };
    const receiverCloseHandler = (context: EventContext): void => {
      this._fsm.handle('receiverCloseEvent', context);
    };
    const receiverErrorHandler  = (context: EventContext): void => {
        this._fsm.handle('receiverErrorEvent', context);
    };

    /*Codes_SRS_NODE_AMQP_RECEIVER_LINK_16_012: [If a `message` event is emitted by the `rhea` link object, the `ReceiverLink` object shall emit a `message` event with the same content.]*/
    const receiverMessageHandler = (context: EventContext): void => {
      this._undisposedDeliveries.push({ msg: context.message as any, delivery: context.delivery });
      this.emit('message', context.message);
    };

    const manageReceiverHandlers = (operation: string) => {
      this._rheaReceiver[operation]('receiver_error', receiverErrorHandler);
      this._rheaReceiver[operation]('receiver_close', receiverCloseHandler);
      this._rheaReceiver[operation]('receiver_open', receiverOpenHandler);
      this._rheaReceiver[operation]('message', receiverMessageHandler);
    };

    this._fsm = new machina.Fsm({
      /*Codes_SRS_NODE_AMQP_RECEIVER_LINK_16_001: [The `ReceiverLink` internal state machine shall be initialized in the `detached` state.]*/
      initialState: 'detached',
      namespace: 'receiverLink',
      states: {
        detached: {
          _onEnter: (callback, err) => {
            this._rheaReceiver = null;
            this._rheaReceiverName = null;

            if (callback) {
              this._safeCallback(callback,err);
            } else if (err) {
              /*Codes_SRS_NODE_AMQP_RECEIVER_LINK_16_011: [If a `receiver_close` or `receiver_error` event is emitted by the `rhea` link object, the `ReceiverLink` object shall forward that error to the client.]*/
              this.emit('error', err);
            }
          },
          attach: (callback) => {
            this._fsm.transition('attaching', callback);
          },
          detach: (callback, err) => {
            /*Codes_SRS_NODE_AMQP_RECEIVER_LINK_06_004: [If the `ReceiverLink` is already in the detached state an invocation of `detach` shall immediately invoke the callback with the (potentially) supplied error parameter.] */
            debug(this.toString() + ': while detached - detach for receiver link ' + this._linkAddress);
            this._safeCallback(callback, err);
          },
          forceDetach: () => {
            debugErrors(this.toString() + ': while detached - force detach for receiver link ' + this._linkAddress);
            /*Codes_SRS_NODE_AMQP_RECEIVER_LINK_16_028: [The `forceDetach` method shall return immediately if the link is already detached.]*/
            return;
          },
          accept: (_message, callback) => callback(new errors.DeviceMessageLockLostError()),
          reject: (_message, callback) => callback(new errors.DeviceMessageLockLostError()),
          abandon: (_message, callback) => callback(new errors.DeviceMessageLockLostError())
        },
        attaching: {
          _onEnter: (callback) => {
            /*Codes_SRS_NODE_AMQP_RECEIVER_LINK_16_004: [The `attach` method shall use the stored instance of the `rhea` session object to attach a new link object with the combined `linkAddress` and `linkOptions` provided when creating the `ReceiverLink` instance.]*/
            this._attachingCallback = callback;
            this._indicatedError = undefined;
            this._receiverCloseOccurred = false;
            this._rheaReceiverName = 'rheaReceiver_' + uuid.v4();
            this._combinedOptions.name = this._rheaReceiverName;
            debug(this.toString() + ': attaching receiver name: ' + this._rheaReceiverName + ' with address: ' + this._linkAddress);
            //
            // According to the rhea maintainers, one can depend on that fact that no actual network activity
            // will occur until the nextTick() after the call to open_receiver.  Because of that, one can
            // put the event handlers on the rhea link returned from the open_receiver call and be assured
            // that the listeners are in place BEFORE any possible events will be emitted on the link.
            //
            this._rheaReceiver = this._rheaSession.open_receiver(this._combinedOptions);
            manageReceiverHandlers('on');
          },
          receiverOpenEvent: (context: EventContext) => {
            debug(this.toString() + ': In receiver attaching state - open event for ' + context.receiver.name);
            const callback = this._attachingCallback;
            this._attachingCallback = null;
            this._fsm.transition('attached', callback);
          },
          receiverErrorEvent: (context: EventContext) => {
            debugErrors(this.toString() + ': In receiver attaching state - error event for ' + context.receiver.name + ' error is: ' + getErrorName(context.receiver.error));
            this._indicatedError = context.receiver.error;
            //
            // We don't transition at this point in that we are guaranteed that the error will be followed by a receiver_close
            // event.
            //
          },
          receiverCloseEvent: (context: EventContext) => {
            debug(this.toString() + ': In receiver attaching state - close event for ' + context.receiver.name);
            //
            // We enabled the event listeners on the onEnter handler.  They are to stay alive until we
            // are about to transition to the detached state.
            // We are about to transition to the detached state, so clean up.
            //
            manageReceiverHandlers('removeListener');
            const error = this._indicatedError;
            const callback = this._attachingCallback;
            this._indicatedError = undefined;
            this._attachingCallback = undefined;
            this._receiverCloseOccurred = true;
            this._fsm.transition('detached', callback, error);
          },
          attach: (null),
          detach: (callback, err) => {
            /*Codes_SRS_NODE_AMQP_RECEIVER_LINK_06_002: [If the `detach` method is invoked on the `ReceiverLink` while still attaching, the ReceiverLink shall detach.  It will indicate the error to the callback for the `detach` as well as the callback to the `attach`.] */
            debug(this.toString() + ': Detaching while attaching of rhea receiver link ' + this._linkAddress);
            manageReceiverHandlers('removeListener');
            //
            // We may have a callback outstanding from the request that started the attaching.
            // We will signal to that callback that an error occurred. We will also invoke the callback supplied
            // for this detach.
            //
            const error = err || this._indicatedError || new Error('Unexpected link detach while attaching');
            const attachingCallback = this._attachingCallback;
            this._indicatedError = undefined;
            this._attachingCallback = undefined;
            if (attachingCallback) {
              attachingCallback(error);
            }
            this._fsm.transition('detached', callback, error);
          },
          forceDetach: (err) => {
            /*Codes_SRS_NODE_AMQP_RECEIVER_LINK_06_003: [If the `forceDetach` method is invoked on the `ReceiverLink` while still attaching, the ReceiverLink shall detach.  With the error supplied to the forceDetach, the `attach` callback will also be invoked.  If the error is NOT falsy it will also be emitted as the argument to the `error` event.] */
            debug(this.toString() + ': Force detaching while attaching of rhea receiver link ' + this._linkAddress);
            manageReceiverHandlers('removeListener');
            const error = err || this._indicatedError;
            const attachingCallback = this._attachingCallback;
            this._indicatedError = undefined;
            this._attachingCallback = undefined;
            attachingCallback(error);
            this._fsm.transition('detached', undefined, error);
          },
          accept: (_message, callback) => callback(new errors.DeviceMessageLockLostError()),
          reject: (_message, callback) => callback(new errors.DeviceMessageLockLostError()),
          abandon: (_message, callback) => callback(new errors.DeviceMessageLockLostError())
        },
        attached: {
          _onEnter: (callback, err) => {
            if (callback) callback(err);
          },
          receiverErrorEvent: (context: EventContext) => {
            debugErrors(this.toString() + ': In receiver attached state - error event for ' + context.receiver.name + ' error is: ' + getErrorName(context.receiver.error));
            this._indicatedError = context.receiver.error;
            //
            // We don't transition at this point in that we are guaranteed that the error will be followed by a receiver_close
            // event.
            //
          },
          receiverCloseEvent: (context: EventContext) => {
            /*Codes_SRS_NODE_AMQP_RECEIVER_LINK_06_009: [If a `receiver_close` event received with no preceding error, the link shall be closed with no error.] */
            //
            // We have a close (which is how the amqp detach performative is surfaced). It could be because of an error that was
            // already indicated by the receiver_error event. Or it could simply be that for some reason (disconnect tests?)
            // the service side had decided to shut down the link.
            //
            const error = this._indicatedError; // This might be undefined.
            this._indicatedError = undefined;
            this._receiverCloseOccurred = true;
            debugErrors(this.toString() + ': In receiver attached state - close event for ' + context.receiver.name + ' already indicated error is: ' + getErrorName(error));
            if (error) {
              context.container.emit('azure-iot-amqp-base:error-indicated', error);
            } else {
              this._fsm.transition('detaching');
            }
          },
          attach: (callback) => {
            /*Codes_SRS_NODE_AMQP_RECEIVER_LINK_06_001: [The `attach` method shall immediately invoke the `callback` if already in an attached state.] */
            this._safeCallback(callback);
          },
          detach: (callback, err) => {
            debug(this.toString() + ': While attached - detach for receiver link ' + this._linkAddress + ' callback: ' + callback + ' error: ' + getErrorName(err));
            this._fsm.transition('detaching', callback, err);
          },
          forceDetach: (err) => {
            debugErrors(this.toString() + ': While attached - force detach for receiver link ' + this._linkAddress);
            manageReceiverHandlers('removeListener');
            /*Codes_SRS_NODE_AMQP_RECEIVER_LINK_16_027: [** The `forceDetach` method shall call the `remove` method on the underlying `rhea` link object.]*/
            this._rheaReceiver.remove();
            /*Codes_SRS_NODE_AMQP_RECEIVER_LINK_06_008: [The `forceDetach` method shall cause an `error` event to be emitted on the `ReceiverLink` if an error is supplied.] */
            this._fsm.transition('detached', undefined, err);
          },
          accept: (message, callback) => {
            const delivery = this._findDeliveryRecord(message);
            if (delivery) {
              /*Codes_SRS_NODE_AMQP_RECEIVER_LINK_16_022: [** The `accept` method shall work whether a `callback` is specified or not, and call the callback with a `result.MessageCompleted` object if a callback is specified.]*/
              delivery.accept();
              this._safeCallback(callback, null, new results.MessageCompleted());
            } else {
              this._safeCallback(callback, new errors.DeviceMessageLockLostError());
            }
          },
          reject: (message, callback) => {
            const delivery = this._findDeliveryRecord(message);
            if (delivery) {
              /*Codes_SRS_NODE_AMQP_RECEIVER_LINK_16_023: [** The `reject` method shall work whether a `callback` is specified or not, and call the callback with a `result.MessageRejected` object if a callback is specified.]*/
              delivery.reject();
              this._safeCallback(callback, null, new results.MessageRejected());
            } else {
              this._safeCallback(callback, new errors.DeviceMessageLockLostError());
            }
          },
          abandon: (message, callback) => {
            const delivery = this._findDeliveryRecord(message);
            if (delivery) {
              /*Codes_SRS_NODE_AMQP_RECEIVER_LINK_16_024: [** The `abandon` method shall work whether a `callback` is specified or not, and call the callback with a `result.MessageAbandoned` object if a callback is specified.]*/
              delivery.release();
              this._safeCallback(callback, null, new results.MessageAbandoned());
            } else {
              this._safeCallback(callback, new errors.DeviceMessageLockLostError());
            }
          }
        },
        detaching: {
          _onEnter: (callback, err) => {
            /*Codes_SRS_NODE_AMQP_RECEIVER_LINK_16_025: [The `detach` method shall call the `callback` with an `Error` that caused the detach whether it succeeds or fails to cleanly detach the link.]*/
            debug(this.toString() + ': Detaching of rhea receiver link ' + this._linkAddress);
            this._detachingCallback = callback;
            this._indicatedError = err;
            /*Codes_SRS_NODE_AMQP_RECEIVER_LINK_16_009: [The `detach` method shall detach the link created by `rhea` object.] */
            this._rheaReceiver.close();
            if (this._receiverCloseOccurred) {
              //
              // There will be no response from the peer to our detach (close).  Therefore no event handler will be invoked.  Simply
              // transition to detached now.
              //
              this._detachingCallback = undefined;
              this._indicatedError = undefined;
              this._fsm.transition('detached', callback, err);
            }
          },
          receiverErrorEvent: (context: EventContext) => {
            /*Codes_SRS_NODE_AMQP_RECEIVER_LINK_06_006: [An error occurring during a detach will be indicated in the error result of the `detach`.] */
            debugErrors(this.toString() + ': In receiver detaching state - error event for ' + context.receiver.name + ' error is: ' + getErrorName(context.receiver.error));
            this._indicatedError = this._indicatedError || context.receiver.error;
            //
            // We don't transition at this point in that we are guaranteed that the error will be followed by a receiver_close
            // event.
            //
          },
          receiverCloseEvent: (context: EventContext) => {
            debugErrors(this.toString() + ': In receiver detaching state - close event for ' + context.receiver.name + ' already indicated error is: ' + getErrorName(this._indicatedError));
            const error = this._indicatedError;
            const callback = this._detachingCallback;
            this._detachingCallback = undefined;
            this._indicatedError = undefined;
            manageReceiverHandlers('removeListener');
            this._fsm.transition('detached', callback, error);
          },
          detach: (callback, err) => {
            /*Codes_SRS_NODE_AMQP_RECEIVER_LINK_06_007: [If `detach` invoked while already detaching, it's callback will be invoked with an error.  Whatever caused the original detaching will proceed.] */
            //
            // Note that we are NOT transitioning to the detached state.
            // We are going to give the code a chance to complete normally.
            // The caller was free to invoke forceDetach.  That handler will
            // ALWAYS transition to the detached state.
            //
            debug(this.toString() + ': While detaching - detach for receiver link ' + this._linkAddress);
            if (callback) {
              err = err || new Error('Detached invoked while detaching.');
              this._safeCallback(callback, err);
            }
          },
          forceDetach: (err) => {
            /*Codes_SRS_NODE_AMQP_RECEIVER_LINK_06_005: [If `forceDetach` invoked while detaching, the detach will be completed with the error supplied to the `forceDetach` or an error indicating that the `detach` was preempted by the `forceDetach`.] */
            debugErrors(this.toString() + ': While detaching - Force detaching for receiver link ' + this._linkAddress);
            this._rheaReceiver.remove();
            const detachCallback = this._detachingCallback;
            const error = err || this._indicatedError || new Error('Detach preempted by force');
            this._detachingCallback = undefined;
            this._indicatedError = undefined;
            manageReceiverHandlers('removeListener');
            if (detachCallback) {
              detachCallback(error);
            }
            this._fsm.transition('detached', undefined, err);
          },
          '*': () => {
            this._fsm.deferUntilTransition('detached');
          }
        }
      }
    });

    this._fsm.on('transition', (transition) => {
      debug(this.toString() + ': ' + transition.fromState + ' -> ' + transition.toState + ' (action:' + transition.action + ')');
    });

    this.on('removeListener', (eventName) => {
      // stop listening for AMQP events if our consumers stop listening for our events
      if (eventName === 'message' && this.listeners('message').length === 0) {
        this._fsm.handle('detach');
      }
    });

    this.on('newListener', (eventName) => {
      // lazy-init AMQP event listeners
      if (eventName === 'message') {
        this._fsm.handle('attach');
      }
    });
  }