constructor()

in src/common-amqp/amqp_cbs.ts [77:305]


  constructor(session: Session) {
    super();

    this._rheaSession = session;
    /*Codes_SRS_NODE_AMQP_CBS_16_001: [The `constructor` shall instantiate a `SenderLink` object for the `$cbs` endpoint.]*/
    this._senderLink = new SenderLink(ClaimsBasedSecurityAgent._putTokenSendingEndpoint, null, this._rheaSession);

    /*Codes_SRS_NODE_AMQP_CBS_16_002: [The `constructor` shall instantiate a `ReceiverLink` object for the `$cbs` endpoint.]*/
    this._receiverLink = new ReceiverLink(ClaimsBasedSecurityAgent._putTokenReceivingEndpoint, null, this._rheaSession);

    this._putTokensNotYetSent = [];

    this._fsm = new machina.Fsm({
      initialState: 'detached',
      states: {
        detached: {
          _onEnter: (callback, err) => {
            clearTimeout(this._putToken.timeoutTimer); // In the detached state there should be no outstanding put tokens.
            let tokenOperation = this._putTokensNotYetSent.shift();
            while (tokenOperation) {
              tokenOperation.callback(err);
              tokenOperation = this._putTokensNotYetSent.shift();
            }

            /*Codes_SRS_NODE_AMQP_CBS_16_006: [If given as an argument, the `attach` method shall call `callback` with a standard `Error` object if any link fails to attach.]*/
            if (callback) {
              callback(err);
            }
          },
          attach: (callback) => {
            this._fsm.transition('attaching', callback);
          },
          detach: (callback) => { if (callback) callback(); },
          /*Tests_SRS_NODE_AMQP_CBS_16_021: [The `forceDetach()` method shall return immediately if no link is attached.]*/
          forceDetach: () => { return; },
          putToken: (audience, token, callback) => {
            this._putTokensNotYetSent.push({
              audience: audience,
              token: token,
              callback: callback
            });
            this._fsm.transition('attaching');
          }
         },
        attaching: {
          _onEnter: (callback) => {
            this._senderLink.attach((err) => {
              if (err) {
                this._fsm.transition('detaching', callback, err);
              } else {
                this._receiverLink.attach((err) => {
                  if (err) {
                    this._fsm.transition('detaching', callback, err);
                  } else {
                    this._receiverLink.on('message', (msg) => {
                      //
                      // Regardless of whether we found the put token in the list of outstanding
                      // operations, accept it.  This could be a put token that we previously
                      // timed out.  Be happy.  It made it home, just too late to be useful.
                      //
                      /*Codes_SRS_NODE_AMQP_CBS_16_020: [All responses shall be accepted.]*/
                      this._receiverLink.accept(msg);
                      for (let i = 0; i < this._putToken.outstandingPutTokens.length; i++) {
                        //
                        // Just as a reminder.  For cbs, the message id and the correlation id
                        // always stayed as string values.  They never went on over the wire
                        // as the amqp uuid type.
                        //
                        if (msg.correlation_id === this._putToken.outstandingPutTokens[i].correlationId) {
                          const completedPutToken = this._putToken.outstandingPutTokens[i];
                          this._putToken.outstandingPutTokens.splice(i, 1);
                          //
                          // If this was the last outstanding put token then get rid of the timer trying to clear out expiring put tokens.
                          //
                          if (this._putToken.outstandingPutTokens.length === 0) {
                            clearTimeout(this._putToken.timeoutTimer);
                          }
                          if (completedPutToken.putTokenCallback) {
                            /*Codes_SRS_NODE_AMQP_CBS_16_019: [A put token response of 200 will invoke `putTokenCallback` with null parameters.]*/
                            let error = null;
                            if (msg.application_properties['status-code'] !== 200) {
                              /*Codes_SRS_NODE_AMQP_CBS_16_018: [A put token response not equal to 200 will invoke `putTokenCallback` with an error object of UnauthorizedError.]*/
                              error = new errors.UnauthorizedError(msg.application_properties['status-description']);
                            }
                            completedPutToken.putTokenCallback(error);
                          }
                          break;
                        }
                      }
                    });
                    this._fsm.transition('attached', callback);
                  }
                });
              }
            });
          },
          detach: (callback, err) => this._fsm.transition('detaching', callback, err),
          forceDetach: () => {
            /*Tests_SRS_NODE_AMQP_CBS_16_022: [The `forceDetach()` method shall call `forceDetach()` on all attached links.]*/
            if (this._senderLink) {
              this._senderLink.forceDetach();
            }
            if (this._receiverLink) {
              this._receiverLink.forceDetach();
            }
            this._fsm.transition('detached');
          },
          putToken: (audience, token, callback) => {
            this._putTokensNotYetSent.push({
              audience: audience,
              token: token,
              callback: callback
            });
          }
         },
        attached: {
          _onEnter: (callback) => {
            if (callback) {
              callback();
            }
            let tokenOperation = this._putTokensNotYetSent.shift();
            while (tokenOperation) {
              this._fsm.handle('putToken', tokenOperation.audience, tokenOperation.token, tokenOperation.callback);
              tokenOperation = this._putTokensNotYetSent.shift();
            }
          },
          /*Codes_SRS_NODE_AMQP_CBS_06_001: [If in the attached state, either the sender or the receiver links gets an error, an error of `azure-iot-amqp-base:error-indicated` will have been indicated on the container object and the cbs will remain in the attached state.  The owner of the cbs MUST detach.] */
          attach: (callback) => callback(),
          detach: (callback, err) => {
            debug('while attached - detach for CBS links ' + this._receiverLink + ' ' + this._senderLink);
            this._fsm.transition('detaching', callback, err);
          },
          forceDetach: () => {
            debugErrors('while attached - force detach for CBS links ' + this._receiverLink + ' ' + this._senderLink);
            /*Tests_SRS_NODE_AMQP_CBS_16_022: [The `forceDetach()` method shall call `forceDetach()` on all attached links.]*/
            this._receiverLink.forceDetach();
            this._senderLink.forceDetach();
            this._fsm.transition('detached');
          },
          putToken: (audience, token, putTokenCallback) => {
          /*SRS_NODE_AMQP_CBS_16_011: [The `putToken` method shall construct an amqp message that contains the following application properties:
          ```
          'operation': 'put-token'
          'type': 'servicebus.windows.net:sastoken'
          'name': <audience>
          ```
          and system properties of
          ```
          'to': '$cbs'
          'messageId': <uuid>
          'reply_to': 'cbs'
          ```
          and a body containing `<sasToken>`.]*/
            const amqpMessage = new AmqpMessage();
            amqpMessage.application_properties = {
              operation: 'put-token',
              type: 'servicebus.windows.net:sastoken',
              name: audience
            };
            amqpMessage.body = token;
            amqpMessage.to = '$cbs';
            //
            // For CBS, the message id and correlation id are encoded as string
            //
            amqpMessage.message_id = uuid.v4();
            amqpMessage.reply_to = 'cbs';

            const outstandingPutToken: PutTokenOperation = {
              putTokenCallback: putTokenCallback,
              expirationTime: Math.round(Date.now() / 1000) + this._putToken.numberOfSecondsToTimeout,
              correlationId: amqpMessage.message_id
            };

            this._putToken.outstandingPutTokens.push(outstandingPutToken);
            //
            // If this is the first put token then start trying to time it out.
            //
            if (this._putToken.outstandingPutTokens.length === 1) {
              this._putToken.timeoutTimer = setTimeout(this._removeExpiredPutTokens.bind(this), this._putToken.putTokenTimeOutExaminationInterval);
            }
            /*Codes_SRS_NODE_AMQP_CBS_16_012: [The `putToken` method shall send this message over the `$cbs` sender link.]*/
            this._senderLink.send(amqpMessage, (err) => {
              if (err) {
                // Find the operation in the outstanding array.  Remove it from the array since, well, it's not outstanding anymore.
                // Since we may have arrived here asynchronously, we simply can't assume that it is the end of the array.  But,
                // it's more likely near the end.
                // If the token has expired it won't be found, but that's ok because its callback will have been called when it was removed.
                for (let i = this._putToken.outstandingPutTokens.length - 1; i >= 0; i--) {
                  if (this._putToken.outstandingPutTokens[i].correlationId === amqpMessage.message_id) {
                    const outStandingPutTokenInError = this._putToken.outstandingPutTokens[i];
                    this._putToken.outstandingPutTokens.splice(i, 1);
                    //
                    // This was the last outstanding put token.  No point in having a timer around trying to time nothing out.
                    //
                    if (this._putToken.outstandingPutTokens.length === 0) {
                      clearTimeout(this._putToken.timeoutTimer);
                    }
                    /*Codes_SRS_NODE_AMQP_CBS_16_013: [The `putToken` method shall call `callback` (if supplied) if the `send` generates an error such that no response from the service will be forthcoming.]*/
                    outStandingPutTokenInError.putTokenCallback(err);
                    break;
                  }
                }
              }
            });
          }
        },
        detaching: {
          _onEnter: (forwardedCallback, err) => {
            /*Codes_SRS_NODE_AMQP_CBS_16_008: [`detach` shall detach both sender and receiver links and return the state machine to the `detached` state.]*/
            if (err) {
              debugErrors('Detaching because of ' + getErrorName(err));
            }
            const links = [this._senderLink, this._receiverLink];
            async.each(links, (link, callback) => {
              if (link) {
                debug('while detaching for link: ' + link);
                link.detach(callback, err);
              } else {
                callback();
              }
            }, () => {
              this._fsm.transition('detached', forwardedCallback, err);
            });
          },
          '*': (_callback) => this._fsm.deferUntilTransition('detached')
        }
      }
    });
  }