in src/common-amqp/amqp_cbs.ts [77:305]
constructor(session: Session) {
super();
this._rheaSession = session;
/*Codes_SRS_NODE_AMQP_CBS_16_001: [The `constructor` shall instantiate a `SenderLink` object for the `$cbs` endpoint.]*/
this._senderLink = new SenderLink(ClaimsBasedSecurityAgent._putTokenSendingEndpoint, null, this._rheaSession);
/*Codes_SRS_NODE_AMQP_CBS_16_002: [The `constructor` shall instantiate a `ReceiverLink` object for the `$cbs` endpoint.]*/
this._receiverLink = new ReceiverLink(ClaimsBasedSecurityAgent._putTokenReceivingEndpoint, null, this._rheaSession);
this._putTokensNotYetSent = [];
this._fsm = new machina.Fsm({
initialState: 'detached',
states: {
detached: {
_onEnter: (callback, err) => {
clearTimeout(this._putToken.timeoutTimer); // In the detached state there should be no outstanding put tokens.
let tokenOperation = this._putTokensNotYetSent.shift();
while (tokenOperation) {
tokenOperation.callback(err);
tokenOperation = this._putTokensNotYetSent.shift();
}
/*Codes_SRS_NODE_AMQP_CBS_16_006: [If given as an argument, the `attach` method shall call `callback` with a standard `Error` object if any link fails to attach.]*/
if (callback) {
callback(err);
}
},
attach: (callback) => {
this._fsm.transition('attaching', callback);
},
detach: (callback) => { if (callback) callback(); },
/*Tests_SRS_NODE_AMQP_CBS_16_021: [The `forceDetach()` method shall return immediately if no link is attached.]*/
forceDetach: () => { return; },
putToken: (audience, token, callback) => {
this._putTokensNotYetSent.push({
audience: audience,
token: token,
callback: callback
});
this._fsm.transition('attaching');
}
},
attaching: {
_onEnter: (callback) => {
this._senderLink.attach((err) => {
if (err) {
this._fsm.transition('detaching', callback, err);
} else {
this._receiverLink.attach((err) => {
if (err) {
this._fsm.transition('detaching', callback, err);
} else {
this._receiverLink.on('message', (msg) => {
//
// Regardless of whether we found the put token in the list of outstanding
// operations, accept it. This could be a put token that we previously
// timed out. Be happy. It made it home, just too late to be useful.
//
/*Codes_SRS_NODE_AMQP_CBS_16_020: [All responses shall be accepted.]*/
this._receiverLink.accept(msg);
for (let i = 0; i < this._putToken.outstandingPutTokens.length; i++) {
//
// Just as a reminder. For cbs, the message id and the correlation id
// always stayed as string values. They never went on over the wire
// as the amqp uuid type.
//
if (msg.correlation_id === this._putToken.outstandingPutTokens[i].correlationId) {
const completedPutToken = this._putToken.outstandingPutTokens[i];
this._putToken.outstandingPutTokens.splice(i, 1);
//
// If this was the last outstanding put token then get rid of the timer trying to clear out expiring put tokens.
//
if (this._putToken.outstandingPutTokens.length === 0) {
clearTimeout(this._putToken.timeoutTimer);
}
if (completedPutToken.putTokenCallback) {
/*Codes_SRS_NODE_AMQP_CBS_16_019: [A put token response of 200 will invoke `putTokenCallback` with null parameters.]*/
let error = null;
if (msg.application_properties['status-code'] !== 200) {
/*Codes_SRS_NODE_AMQP_CBS_16_018: [A put token response not equal to 200 will invoke `putTokenCallback` with an error object of UnauthorizedError.]*/
error = new errors.UnauthorizedError(msg.application_properties['status-description']);
}
completedPutToken.putTokenCallback(error);
}
break;
}
}
});
this._fsm.transition('attached', callback);
}
});
}
});
},
detach: (callback, err) => this._fsm.transition('detaching', callback, err),
forceDetach: () => {
/*Tests_SRS_NODE_AMQP_CBS_16_022: [The `forceDetach()` method shall call `forceDetach()` on all attached links.]*/
if (this._senderLink) {
this._senderLink.forceDetach();
}
if (this._receiverLink) {
this._receiverLink.forceDetach();
}
this._fsm.transition('detached');
},
putToken: (audience, token, callback) => {
this._putTokensNotYetSent.push({
audience: audience,
token: token,
callback: callback
});
}
},
attached: {
_onEnter: (callback) => {
if (callback) {
callback();
}
let tokenOperation = this._putTokensNotYetSent.shift();
while (tokenOperation) {
this._fsm.handle('putToken', tokenOperation.audience, tokenOperation.token, tokenOperation.callback);
tokenOperation = this._putTokensNotYetSent.shift();
}
},
/*Codes_SRS_NODE_AMQP_CBS_06_001: [If in the attached state, either the sender or the receiver links gets an error, an error of `azure-iot-amqp-base:error-indicated` will have been indicated on the container object and the cbs will remain in the attached state. The owner of the cbs MUST detach.] */
attach: (callback) => callback(),
detach: (callback, err) => {
debug('while attached - detach for CBS links ' + this._receiverLink + ' ' + this._senderLink);
this._fsm.transition('detaching', callback, err);
},
forceDetach: () => {
debugErrors('while attached - force detach for CBS links ' + this._receiverLink + ' ' + this._senderLink);
/*Tests_SRS_NODE_AMQP_CBS_16_022: [The `forceDetach()` method shall call `forceDetach()` on all attached links.]*/
this._receiverLink.forceDetach();
this._senderLink.forceDetach();
this._fsm.transition('detached');
},
putToken: (audience, token, putTokenCallback) => {
/*SRS_NODE_AMQP_CBS_16_011: [The `putToken` method shall construct an amqp message that contains the following application properties:
```
'operation': 'put-token'
'type': 'servicebus.windows.net:sastoken'
'name': <audience>
```
and system properties of
```
'to': '$cbs'
'messageId': <uuid>
'reply_to': 'cbs'
```
and a body containing `<sasToken>`.]*/
const amqpMessage = new AmqpMessage();
amqpMessage.application_properties = {
operation: 'put-token',
type: 'servicebus.windows.net:sastoken',
name: audience
};
amqpMessage.body = token;
amqpMessage.to = '$cbs';
//
// For CBS, the message id and correlation id are encoded as string
//
amqpMessage.message_id = uuid.v4();
amqpMessage.reply_to = 'cbs';
const outstandingPutToken: PutTokenOperation = {
putTokenCallback: putTokenCallback,
expirationTime: Math.round(Date.now() / 1000) + this._putToken.numberOfSecondsToTimeout,
correlationId: amqpMessage.message_id
};
this._putToken.outstandingPutTokens.push(outstandingPutToken);
//
// If this is the first put token then start trying to time it out.
//
if (this._putToken.outstandingPutTokens.length === 1) {
this._putToken.timeoutTimer = setTimeout(this._removeExpiredPutTokens.bind(this), this._putToken.putTokenTimeOutExaminationInterval);
}
/*Codes_SRS_NODE_AMQP_CBS_16_012: [The `putToken` method shall send this message over the `$cbs` sender link.]*/
this._senderLink.send(amqpMessage, (err) => {
if (err) {
// Find the operation in the outstanding array. Remove it from the array since, well, it's not outstanding anymore.
// Since we may have arrived here asynchronously, we simply can't assume that it is the end of the array. But,
// it's more likely near the end.
// If the token has expired it won't be found, but that's ok because its callback will have been called when it was removed.
for (let i = this._putToken.outstandingPutTokens.length - 1; i >= 0; i--) {
if (this._putToken.outstandingPutTokens[i].correlationId === amqpMessage.message_id) {
const outStandingPutTokenInError = this._putToken.outstandingPutTokens[i];
this._putToken.outstandingPutTokens.splice(i, 1);
//
// This was the last outstanding put token. No point in having a timer around trying to time nothing out.
//
if (this._putToken.outstandingPutTokens.length === 0) {
clearTimeout(this._putToken.timeoutTimer);
}
/*Codes_SRS_NODE_AMQP_CBS_16_013: [The `putToken` method shall call `callback` (if supplied) if the `send` generates an error such that no response from the service will be forthcoming.]*/
outStandingPutTokenInError.putTokenCallback(err);
break;
}
}
}
});
}
},
detaching: {
_onEnter: (forwardedCallback, err) => {
/*Codes_SRS_NODE_AMQP_CBS_16_008: [`detach` shall detach both sender and receiver links and return the state machine to the `detached` state.]*/
if (err) {
debugErrors('Detaching because of ' + getErrorName(err));
}
const links = [this._senderLink, this._receiverLink];
async.each(links, (link, callback) => {
if (link) {
debug('while detaching for link: ' + link);
link.detach(callback, err);
} else {
callback();
}
}, () => {
this._fsm.transition('detached', forwardedCallback, err);
});
},
'*': (_callback) => this._fsm.deferUntilTransition('detached')
}
}
});
}