in src/amqp.ts [84:450]
constructor(config: Client.TransportConfigOptions, amqpBase?: Base) {
super();
this._amqp = amqpBase ? amqpBase : new Base(true);
this._config = config;
this._renewalTimeout = null;
this._amqp.setDisconnectHandler((err) => {
this._fsm.handle('amqpError', err);
});
this._c2dErrorListener = (err) => {
debug('Error on the C2D link: ' + err.toString());
this._c2dLink = null;
};
this._feedbackErrorListener = (err) => {
debug('Error on the message feedback link: ' + err.toString());
this._feedbackReceiver = null;
};
this._fileNotificationErrorListener = (err) => {
debug('Error on the file notification link: ' + err.toString());
this._fileNotificationReceiver = null;
};
this._fsm = new machina.Fsm({
namespace: 'azure-iothub:Amqp',
initialState: 'disconnected',
states: {
disconnected: {
_onEnter: (err, callback) => {
if (err) {
if (callback) {
callback(err);
} else {
this.emit('disconnect', err);
}
} else {
if (callback) {
callback();
}
}
},
connect: (callback) => {
this._fsm.transition('connecting', callback);
},
disconnect: (callback) => callback(),
send: (amqpMessage, deviceEndpoint, callback) => {
/*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_025: [The `send` method shall connect and authenticate the transport if it is disconnected.]*/
this._fsm.handle('connect', (err) => {
if (err) {
/*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_026: [The `send` method shall call its callback with an error if connecting and/or authenticating the transport fails.]*/
callback(err);
} else {
this._fsm.handle('send', amqpMessage, deviceEndpoint, callback);
}
});
},
getFeedbackReceiver: (callback) => {
/*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_033: [The `getFeedbackReceiver` method shall connect and authenticate the transport if it is disconnected.]*/
this._fsm.handle('connect', (err) => {
if (err) {
/*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_034: [The `getFeedbackReceiver` method shall call its callback with an error if the transport fails to connect or authenticate.]*/
callback(err);
} else {
this._fsm.handle('getFeedbackReceiver', callback);
}
});
},
getFileNotificationReceiver: (callback) => {
/*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_036: [The `getFileNotificationReceiver` method shall connect and authenticate the transport if it is disconnected.]*/
this._fsm.handle('connect', (err) => {
if (err) {
/*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_037: [The `getFileNotificationReceiver` method shall call its callback with an error if the transport fails to connect or authenticate.]*/
callback(err);
} else {
this._fsm.handle('getFileNotificationReceiver', callback);
}
});
},
updateSharedAccessSignature: (_updatedSAS, callback) => {
/*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_032: [The `updateSharedAccessSignature` shall not establish a connection if the transport is disconnected, but should use the new shared access signature on the next manually initiated connection attempt.]*/
callback();
},
updateAccessToken: (_tokenValue, callback) => {
callback();
},
amqpError: (err) => {
debug('Late arriving error received while in disconnected state.');
if (err) {
debug(err.toString());
}
}
},
connecting: {
_onEnter: (callback) => {
const config: AmqpBaseTransportConfig = {
uri: this._getConnectionUri(),
userAgentString: packageJson.name + '/' + packageJson.version
};
debug('connecting');
this._amqp.connect(config, (err, _result) => {
if (err) {
debug('failed to connect' + err.toString());
this._fsm.transition('disconnected', err, callback);
} else {
debug('connected');
this._fsm.transition('authenticating', callback);
}
});
},
disconnect: (callback) => {
this._fsm.transition('disconnecting', null, callback);
},
amqpError: (err) => {
this._fsm.transition('disconnecting', err);
},
'*': () => this._fsm.deferUntilTransition()
},
authenticating: {
_onEnter: (callback) => {
/*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_06_001: [`initializeCBS` shall be invoked.]*/
this._amqp.initializeCBS((err) => {
if (err) {
debug('error trying to initialize CBS: ' + err.toString());
/*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_06_002: [If `initializeCBS` is not successful then the client will remain disconnected and the callback, if provided, will be invoked with an error object.]*/
this._fsm.transition('disconnecting', err, callback);
} else {
debug('CBS initialized');
/*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_06_003: [If `initializeCBS` is successful, `putToken` shall be invoked with the first parameter audience, created from the sr of the sas signature, the next parameter of the actual sas, and a callback.]*/
if (this._config.sharedAccessSignature) {
const audience = SharedAccessSignature.parse(this._config.sharedAccessSignature.toString(), ['sr', 'sig', 'se']).sr;
const isApplicationSuppliedSas = typeof (this._config.sharedAccessSignature) === 'string';
const sasToken = isApplicationSuppliedSas ? this._config.sharedAccessSignature as string : (this._config.sharedAccessSignature as SharedAccessSignature).extend(anHourFromNow());
this._amqp.putToken(audience, sasToken, (err) => {
if (err) {
/*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_06_004: [** If `putToken` is not successful then the client will remain disconnected and the callback, if provided, will be invoked with an error object.]*/
this._fsm.transition('disconnecting', err, callback);
} else {
this._fsm.transition('authenticated', isApplicationSuppliedSas, callback);
}
});
} else if (this._config.tokenCredential) {
this.getToken().then((accessToken) => {
const tokenValue = this._bearerTokenPrefix + accessToken.token;
this._amqp.putToken(this._config.tokenScope, tokenValue, (err) => {
if (err) {
/*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_06_004: [** If `putToken` is not successful then the client will remain disconnected and the callback, if provided, will be invoked with an error object.]*/
this._fsm.transition('disconnecting', err, callback);
} else {
this._renewalNumberOfMilliseconds = Math.max(1000, (2 / 3) * (accessToken.expiresOnTimestamp - Date.now()));
this._fsm.transition('authenticated', false, callback);
}
});
}).catch((err) => {
this._fsm.transition('disconnecting', err, callback);
});
}
}
});
},
disconnect: (callback) => {
this._fsm.transition('disconnecting', null, callback);
},
amqpError: (err) => {
this._fsm.transition('disconnecting', err);
},
'*': () => this._fsm.deferUntilTransition()
},
authenticated: {
_onEnter: (isApplicationSuppliedSas, callback) => {
if (!isApplicationSuppliedSas) {
const renewalCallback = this._config.sharedAccessSignature ?
this._handleSASRenewal :
this._handleTokenCredentialRenewal;
this._renewalTimeout = setTimeout(renewalCallback.bind(this), this._renewalNumberOfMilliseconds);
}
callback(null, new results.Connected());
},
_onExit: (_callback) => {
if (this._renewalTimeout) {
clearTimeout(this._renewalTimeout);
}
},
connect: (callback) => callback(),
disconnect: (callback) => this._fsm.transition('disconnecting', null, callback),
send: (amqpMessage, deviceEndpoint, callback) => {
/*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_003: [The message generated by the `send` method should have its “to” field set to the device ID passed as an argument.]*/
amqpMessage.to = deviceEndpoint;
if (!this._c2dLink) {
debug('attaching new sender link: ' + this._c2dEndpoint);
/*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_027: [The `send` method shall attach the C2D link if necessary.]*/
this._amqp.attachSenderLink(this._c2dEndpoint, null, (err, link) => {
if (err) {
debug('error trying to attach new sender link: ' + err.toString());
/*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_029: [The `send` method shall call its callback with an error if it fails to attach the C2D link.]*/
callback(err);
} else {
debug('sender link attached. sending message.');
this._c2dLink = link;
this._c2dLink.on('error', this._c2dErrorListener);
/*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_030: [The `send` method shall call the `send` method of the C2D link and pass it the Amqp request that it created.]*/
this._c2dLink.send(amqpMessage, callback);
}
});
} else {
debug('reusing existing sender link: ' + this._c2dEndpoint + '. sending message.');
/*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_028: [The `send` method shall reuse the C2D link if it is already attached.]*/
this._c2dLink.send(amqpMessage, callback);
}
},
getFeedbackReceiver: (callback) => {
if (this._feedbackReceiver) {
/*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_035: [The `getFeedbackReceiver` method shall reuse the existing feedback receiver it if has already been attached.]*/
callback(null, this._feedbackReceiver);
} else {
/*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_013: [The `getFeedbackReceiver` method shall request an `AmqpReceiver` object from the base AMQP transport for the `/messages/serviceBound/feedback` endpoint.]*/
this._amqp.attachReceiverLink(this._feedbackEndpoint, null, (err, link) => {
if (err) {
callback(err);
} else {
this._feedbackReceiver = new ServiceReceiver(link);
this._feedbackReceiver.on('error', this._feedbackErrorListener);
callback(null, this._feedbackReceiver);
}
});
}
},
getFileNotificationReceiver: (callback) => {
/*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_038: [The `getFileNotificationReceiver` method shall reuse the existing feedback receiver it if has already been attached.]*/
if (this._fileNotificationReceiver) {
callback(null, this._fileNotificationReceiver);
} else {
/*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_016: [The `getFileNotificationReceiver` method shall request an `AmqpReceiver` object from the base AMQP transport for the `/messages/serviceBound/filenotifications` endpoint.]*/
this._amqp.attachReceiverLink(this._fileNotificationEndpoint, null, (err, link) => {
if (err) {
callback(err);
} else {
this._fileNotificationReceiver = new ServiceReceiver(link);
this._fileNotificationReceiver.on('error', this._fileNotificationErrorListener);
callback(null, this._fileNotificationReceiver);
}
});
}
},
updateSharedAccessSignature: (updatedSAS, callback) => {
/*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_031: [The `updateSharedAccessSignature` shall trigger a `putToken` call on the base transport if it is connected.]*/
const audience = SharedAccessSignature.parse(this._config.sharedAccessSignature.toString(), ['sr', 'sig', 'se']).sr;
this._amqp.putToken(audience, updatedSAS, callback);
},
updateAccessToken: (tokenValue, callback) => {
this._amqp.putToken(this._config.tokenScope, tokenValue, callback);
},
amqpError: (err) => {
this._fsm.transition('disconnecting', err);
}
},
disconnecting: {
_onEnter: (err, disconnectCallback) => {
let finalError: Error = err;
async.series([
(callback) => {
/*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_021: [The `disconnect` method shall detach the C2D messaging link if it is attached.]*/
if (this._c2dLink) {
const tmpC2DLink = this._c2dLink;
this._c2dLink = undefined;
if (err) {
debug('force-detaching c2d links');
tmpC2DLink.forceDetach(err);
callback();
} else {
tmpC2DLink.detach((detachErr) => {
if (detachErr) {
debug('error detaching the c2d link: ' + detachErr.toString());
if (!finalError) {
finalError = translateError('error while detaching the c2d link when disconnecting', detachErr);
}
} else {
debug('c2d link detached.');
}
callback();
});
}
} else {
callback();
}
},
(callback) => {
/*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_022: [The `disconnect` method shall detach the C2D feedback receiver link if it is attached.]*/
if (this._feedbackReceiver) {
const tmpFeedbackReceiver = this._feedbackReceiver;
this._feedbackReceiver = undefined;
if (err) {
tmpFeedbackReceiver.forceDetach(err);
tmpFeedbackReceiver.removeListener('error', this._feedbackErrorListener);
callback();
} else {
tmpFeedbackReceiver.detach((detachErr) => {
if (detachErr) {
debug('error detaching the message feedback link: ' + detachErr.toString());
} else {
debug('feedback link detached');
}
tmpFeedbackReceiver.removeListener('error', this._feedbackErrorListener);
if (!finalError && detachErr) {
finalError = translateError('error while detaching the message feedback link when disconnecting', detachErr);
}
callback();
});
}
} else {
callback();
}
},
/*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_023: [The `disconnect` method shall detach the file notification receiver link if it is attached.]*/
(callback) => {
if (this._fileNotificationReceiver) {
const tmpFileNotificationReceiver = this._fileNotificationReceiver;
this._fileNotificationReceiver = undefined;
if (err) {
tmpFileNotificationReceiver.forceDetach(err);
tmpFileNotificationReceiver.removeListener('error', this._fileNotificationErrorListener);
callback();
} else {
tmpFileNotificationReceiver.detach((detachErr) => {
if (detachErr) {
debug('error detaching the file upload notification link: ' + detachErr.toString());
} else {
debug('File notification link detached');
}
tmpFileNotificationReceiver.removeListener('error', this._fileNotificationErrorListener);
if (!finalError && detachErr) {
finalError = translateError('error while detaching the file upload notification link when disconnecting', detachErr);
}
callback();
});
}
} else {
callback();
}
},
(callback) => {
this._amqp.disconnect((disconnectErr) => {
if (disconnectErr) {
debug('error disconnecting the AMQP connection: ' + disconnectErr.toString());
} else {
debug('amqp connection successfully disconnected.');
}
if (!finalError && disconnectErr) {
finalError = translateError('error while disconnecting the AMQP connection', disconnectErr);
}
callback();
});
}
], () => {
/*Codes_SRS_NODE_IOTHUB_SERVICE_AMQP_16_024: [Any error generated by detaching a link should be passed as the single argument of the callback of the `disconnect` method.]*/
this._fsm.transition('disconnected', finalError, disconnectCallback);
});
},
'*': () => this._fsm.deferUntilTransition()
}
}
});
}