in device/transport/amqp/src/amqp.ts [77:610]
constructor(authenticationProvider: AuthenticationProvider, baseClient?: BaseAmqpClient) {
super();
this._authenticationProvider = authenticationProvider;
/*Codes_SRS_NODE_DEVICE_AMQP_16_056: [If the `authenticationProvider` object passed to the `Amqp` constructor has a `type` property which value is set to `AuthenticationType.Token` the `Amqp` constructor shall subscribe to the `newTokenAvailable` event of the `authenticationProvider` object.]*/
if (this._authenticationProvider.type === AuthenticationType.Token) {
(<any>this._authenticationProvider).on('newTokenAvailable', (newCredentials) => {
/*Codes_SRS_NODE_DEVICE_AMQP_16_057: [If a `newTokenAvailable` event is emitted by the `authenticationProvider` object passed as an argument to the constructor, a `putToken` operation shall be initiated with the new shared access signature if the amqp connection is already connected.]*/
this._fsm.handle('updateSharedAccessSignature', newCredentials.sharedAccessSignature, (err) => {
if (err) {
/*Codes_SRS_NODE_DEVICE_AMQP_16_058: [If the `putToken` operation initiated upon receiving a `newTokenAvailable` event fails, a `disconnect` event shall be emitted with the error from the failed `putToken` operation.]*/
debugErrors('Error updating the shared access signature: ' + getErrorName(err));
this._fsm.handle('disconnect', () => {
debug('emitting the disconnect event in response to a update signature failure');
this.emit('disconnect', err);
});
}
});
});
}
this._amqp = baseClient || new BaseAmqpClient(false);
this._amqp.setDisconnectHandler((err) => {
if (err) {
debugErrors('disconnected event handler: ' + getErrorName(err));
} else {
debug('disconnected event handler: no error');
}
this._fsm.handle('amqpConnectionClosed', err, () => {
this.emit('disconnect', getTranslatedError(err, 'AMQP client disconnected'));
});
});
this._deviceMethodClient = new AmqpDeviceMethodClient(this._authenticationProvider, this._amqp);
/*Codes_SRS_NODE_DEVICE_AMQP_16_041: [Any `error` event received on any of the links used for device methods shall trigger the emission of an `error` event by the transport, with an argument that is a `MethodsDetachedError` object with the `innerError` property set to that error.]*/
this._deviceMethodClient.on('error', (err) => {
const methodsError = new errors.DeviceMethodsDetachedError('Device Methods AMQP links failed');
methodsError.innerError = err;
this.emit('error', methodsError);
});
this._twinClient = new AmqpTwinClient(this._authenticationProvider, this._amqp);
/*Codes_SRS_NODE_DEVICE_AMQP_16_048: [Any `error` event received on any of the links used for twin shall trigger the emission of an `error` event by the transport, with an argument that is a `TwinDetachedError` object with the `innerError` property set to that error.]*/
this._twinClient.on('error', (err) => {
const twinError = new errors.TwinDetachedError('Twin AMQP links failed');
twinError.innerError = err;
this.emit('error', twinError);
});
this._twinClient.on('twinDesiredPropertiesUpdate', (patch) => this.emit('twinDesiredPropertiesUpdate', patch));
/*Codes_SRS_NODE_DEVICE_AMQP_16_034: [Any `error` event received on the C2D link shall trigger the emission of an `error` event by the transport, with an argument that is a `C2DDetachedError` object with the `innerError` property set to that error.]*/
this._c2dErrorListener = (err) => {
debugErrors('Error on the C2D link: ' + getErrorName(err));
const c2dError = new errors.CloudToDeviceDetachedError('Cloud-to-device AMQP link failed');
c2dError.innerError = err;
this.emit('error', c2dError);
};
this._c2dMessageListener = (msg: AmqpMessage) => {
let inputName: string;
if (msg.message_annotations) {
inputName = msg.message_annotations['x-opt-input-name'];
}
if (this._messageEventName === 'inputMessage') {
/*Codes_SRS_NODE_DEVICE_AMQP_18_014: [If `amqp` receives a message on the input message link, it shall emit an "inputMessage" event with the value of the annotation property "x-opt-input-name" as the first parameter and the agnostic message as the second parameter.]*/
debug('inputMessage received on C2D link, emitting \'inputMessage\'');
this.emit('inputMessage', inputName, AmqpMessage.toMessage(msg));
} else {
/*Codes_SRS_NODE_DEVICE_AMQP_18_013: [If `amqp` receives a message on the C2D link, it shall emit a "message" event with the message as the event parameter.]*/
debug('message received on C2D link, emitting \'message\'');
this.emit('message', AmqpMessage.toMessage(msg));
}
};
this._d2cErrorListener = (err) => {
debugErrors('Error on the D2C link: ' + getErrorName(err));
this._d2cLink = null;
// we don't really care because we can reattach the link every time we send and surface the error at that time.
};
this._amqpLinkEmitter = new EventEmitter();
this._amqpLinkEmitter.setMaxListeners(Infinity);
this._fsm = new machina.Fsm({
initialState: 'disconnected',
states: {
disconnected: {
_onEnter: (err, callback) => {
/*Codes_SRS_NODE_DEVICE_AMQP_16_083: [When the `amqp` client is disconnected and if token-based authentication is used the `stop` method of the `AuthenticationProvider` shall be called.]*/
if (this._authenticationProvider.type === AuthenticationType.Token) {
(this._authenticationProvider as SharedAccessKeyAuthenticationProvider).stop();
}
if (callback) {
if (err) {
callback(err);
} else {
callback(null, new results.Disconnected());
}
} else if (err) {
this.emit('error', err);
}
},
connect: (connectCallback) => this._fsm.transition('connecting', connectCallback),
disconnect: (disconnectCallback) => {
if (disconnectCallback) {
disconnectCallback(null, new results.Disconnected());
}
},
sendEvent: (amqpMessage, sendCallback) => {
/*Codes_SRS_NODE_DEVICE_AMQP_16_024: [The `sendEvent` method shall connect and authenticate the transport if necessary.]*/
/*Codes_SRS_NODE_DEVICE_AMQP_18_005: [The `sendOutputEvent` method shall connect and authenticate the transport if necessary.]*/
this._fsm.handle('connect', (err, _result) => {
if (err) {
sendCallback(err);
} else {
this._fsm.handle('sendEvent', amqpMessage, sendCallback);
}
});
},
updateSharedAccessSignature: (_token, callback) => {
// nothing to do here: the SAS has been updated in the config object.
callback(null, new results.SharedAccessSignatureUpdated(false));
},
getTwin: (callback) => {
/*Codes_SRS_NODE_DEVICE_AMQP_16_059: [The `getTwin` method shall connect and authenticate the transport if it is disconnected.]*/
this._fsm.handle('connect', (err, _result) => {
if (err) {
/*Codes_SRS_NODE_DEVICE_AMQP_16_060: [The `getTwin` method shall call its callback with an error if connecting fails.]*/
/*Codes_SRS_NODE_DEVICE_AMQP_16_061: [The `getTwin` method shall call its callback with an error if authenticating fails.]*/
callback(err);
} else {
this._fsm.handle('getTwin', callback);
}
});
},
updateTwinReportedProperties: (patch, callback) => {
/*Codes_SRS_NODE_DEVICE_AMQP_16_065: [The `updateTwinReportedProperties` method shall connect and authenticate the transport if it is disconnected.]*/
this._fsm.handle('connect', (err, _result) => {
if (err) {
/*Codes_SRS_NODE_DEVICE_AMQP_16_066: [The `updateTwinReportedProperties` method shall call its callback with an error if connecting fails.]*/
/*Codes_SRS_NODE_DEVICE_AMQP_16_067: [The `updateTwinReportedProperties` method shall call its callback with an error if authenticating fails.]*/
callback(err);
} else {
this._fsm.handle('updateTwinReportedProperties', patch, callback);
}
});
},
enableTwinDesiredPropertiesUpdates: (callback) => {
/*Codes_SRS_NODE_DEVICE_AMQP_16_071: [The `enableTwinDesiredPropertiesUpdates` method shall connect and authenticate the transport if it is disconnected.]*/
this._fsm.handle('connect', (err, _result) => {
if (err) {
/*Codes_SRS_NODE_DEVICE_AMQP_16_072: [The `enableTwinDesiredPropertiesUpdates` method shall call its callback with an error if connecting fails.]*/
/*Codes_SRS_NODE_DEVICE_AMQP_16_073: [The `enableTwinDesiredPropertiesUpdates` method shall call its callback with an error if authenticating fails.]*/
callback(err);
} else {
this._fsm.handle('enableTwinDesiredPropertiesUpdates', callback);
}
});
},
disableTwinDesiredPropertiesUpdates: (callback) => callback(),
/*Codes_SRS_NODE_DEVICE_AMQP_16_031: [The `enableC2D` method shall connect and authenticate the transport if it is disconnected.]*/
enableC2D: (callback) => {
this._fsm.handle('connect', (err, _result) => {
if (err) {
/*Codes_SRS_NODE_DEVICE_AMQP_16_033: [The `enableC2D` method shall call its `callback` with an `Error` if the transport fails to connect, authenticate or attach link.]*/
callback(err);
} else {
this._fsm.handle('enableC2D', callback);
}
});
},
/*Codes_SRS_NODE_DEVICE_AMQP_16_037: [The `disableC2D` method shall call its `callback` immediately if the transport is already disconnected.]*/
disableC2D: (callback) => {
// if we are disconnected the C2D link is already detached.
callback();
},
/*Codes_SRS_NODE_DEVICE_AMQP_16_038: [The `enableMethods` method shall connect and authenticate the transport if it is disconnected.]*/
enableMethods: (callback) => {
this._fsm.handle('connect', (err, _result) => {
if (err) {
/*Codes_SRS_NODE_DEVICE_AMQP_16_040: [The `enableMethods` method shall call its `callback` with an `Error` if the transport fails to connect, authenticate or attach method links.]*/
callback(err);
} else {
this._fsm.handle('enableMethods', callback);
}
});
},
/*Codes_SRS_NODE_DEVICE_AMQP_16_044: [The `disableMethods` method shall call its `callback` immediately if the transport is already disconnected.]*/
disableMethods: (callback) => {
// if we are disconnected the C2D link is already detached.
callback();
},
amqpConnectionClosed: (err, callback) => {
/*Codes_SRS_NODE_DEVICE_AMQP_16_080: [if the handler specified in the `setDisconnectHandler` call is called while the `Amqp` object is disconnected, the call shall be ignored.]*/
debugErrors('ignoring amqpConnectionClosed because already disconnected: ' + getErrorName(err));
callback();
}
},
connecting: {
_onEnter: (connectCallback) => {
/*Codes_SRS_NODE_DEVICE_AMQP_16_054: [The `connect` method shall get the current credentials by calling `getDeviceCredentials` on the `AuthenticationProvider` object passed to the constructor as an argument.]*/
this._authenticationProvider.getDeviceCredentials((err, credentials) => {
if (err) {
/*Codes_SRS_NODE_DEVICE_AMQP_16_055: [The `connect` method shall call its callback with an error if the callback passed to the `getDeviceCredentials` method is called with an error.]*/
this._fsm.transition('disconnected', translateError('AMQP Transport: Could not get credentials', err), connectCallback);
} else {
if (credentials.moduleId) {
this._c2dEndpoint = endpoint.moduleMessagePath(credentials.deviceId, credentials.moduleId);
this._d2cEndpoint = endpoint.moduleEventPath(credentials.deviceId, credentials.moduleId);
this._messageEventName = 'inputMessage';
} else {
this._c2dEndpoint = endpoint.deviceMessagePath(credentials.deviceId);
this._d2cEndpoint = endpoint.deviceEventPath(credentials.deviceId);
this._messageEventName = 'message';
}
/*Tests_SRS_NODE_DEVICE_AMQP_41_001: [ The AMQP transport should use the productInfo string in the `options` object if present ]*/
/*Tests_SRS_NODE_DEVICE_AMQP_41_002: [ The connect method shall set the productInfo on the options object when calling the underlying connection object's connect method if it was supplied. ]*/
const customInfo = (this._options && this._options.productInfo) ? this._options.productInfo : '';
getUserAgentString(customInfo, (userAgentString) => {
const config: AmqpBaseTransportConfig = {
uri: this._getConnectionUri(credentials),
sslOptions: credentials.x509,
userAgentString: userAgentString
};
/*Codes_SRS_NODE_DEVICE_AMQP_13_002: [ The connect method shall set the CA cert on the options object when calling the underlying connection object's connect method if it was supplied. ]*/
// if (this._options && this._options.ca) {
// config.sslOptions = config.sslOptions || {};
// config.sslOptions.ca = this._options.ca;
// }
if (this._options) {
config.sslOptions = config.sslOptions || {};
/*Codes_SRS_NODE_DEVICE_AMQP_13_002: [ The connect method shall set the CA cert on the options object when calling the underlying connection object's connect method if it was supplied. ]*/
if (this._options.ca) {
config.sslOptions.ca = this._options.ca;
}
/*Codes_SRS_NODE_DEVICE_AMQP_99_084: [ The connect method shall set the HTTPS agent on the options object when calling the underlying connection object's connect method if it was supplied. ]*/
if (this._options.amqp && this._options.amqp.webSocketAgent) {
config.sslOptions.agent = this._options.amqp.webSocketAgent;
}
}
this._amqp.connect(config, (err, connectResult) => {
if (err) {
this._fsm.transition('disconnected', translateError('AMQP Transport: Could not connect', err), connectCallback);
} else {
this._fsm.transition('authenticating', connectResult, connectCallback);
}
});
});
}
});
},
disconnect: (disconnectCallback) => this._fsm.transition('disconnecting', null, disconnectCallback),
updateSharedAccessSignature: (_token, callback) => {
callback(null, new results.SharedAccessSignatureUpdated(false));
},
/*Codes_SRS_NODE_DEVICE_AMQP_16_081: [if the handler specified in the `setDisconnectHandler` call is called while the `Amqp` object is connecting or authenticating, the connection shall be stopped and an `disconnect` event shall be emitted with the error translated to a transport-agnostic error.]*/
amqpConnectionClosed: (err, callback) => this._fsm.transition('disconnecting', err, callback),
'*': () => this._fsm.deferUntilTransition()
},
authenticating: {
_onEnter: (connectResult, connectCallback) => {
if (this._authenticationProvider.type === AuthenticationType.X509) {
/*Codes_SRS_NODE_DEVICE_AMQP_06_005: [If x509 authentication is NOT being utilized then `initializeCBS` shall be invoked.]*/
this._fsm.transition('authenticated', connectResult, connectCallback);
} else {
this._amqp.initializeCBS((err) => {
if (err) {
/*Codes_SRS_NODE_DEVICE_AMQP_06_008: [If `initializeCBS` is not successful then the client will be disconnected.]*/
this._fsm.transition('disconnecting', getTranslatedError(err, 'AMQP Transport: Could not initialize CBS'), connectCallback);
} else {
this._authenticationProvider.getDeviceCredentials((err, credentials) => {
if (err) {
this._fsm.transition('disconnecting', getTranslatedError(err, 'AMQP Transport: Could not get credentials from AuthenticationProvider'), connectCallback);
} else {
/*Codes_SRS_NODE_DEVICE_AMQP_06_006: [If `initializeCBS` is successful, `putToken` shall be invoked If `initializeCBS` is successful, `putToken` shall be invoked with the first parameter `audience`, created from the `sr` of the shared access signature, the actual shared access signature, and a callback.]*/
this._amqp.putToken(SharedAccessSignature.parse(credentials.sharedAccessSignature, ['sr', 'sig', 'se']).sr, credentials.sharedAccessSignature, (err) => {
if (err) {
/*Codes_SRS_NODE_DEVICE_AMQP_06_009: [If `putToken` is not successful then the client will be disconnected.]*/
this._fsm.transition('disconnecting', getTranslatedError(err, 'AMQP Transport: Could not authorize with putToken'), connectCallback);
} else {
this._fsm.transition('authenticated', connectResult, connectCallback);
}
});
}
});
}
});
}
},
disconnect: (disconnectCallback) => this._fsm.transition('disconnecting', null, disconnectCallback),
/*Codes_SRS_NODE_DEVICE_AMQP_16_081: [if the handler specified in the `setDisconnectHandler` call is called while the `Amqp` object is connecting or authenticating, the connection shall be stopped and an `disconnect` event shall be emitted with the error translated to a transport-agnostic error.]*/
amqpConnectionClosed: (err, callback) => this._fsm.transition('disconnecting', err, callback),
'*': () => this._fsm.deferUntilTransition()
},
authenticated: {
_onEnter: (connectResult, connectCallback) => {
/*Codes_SRS_NODE_DEVICE_AMQP_41_005: [ Once the amqp client is authenticated it will emit a `connected` event ]*/
this.emit('connected');
connectCallback(null, connectResult);
},
connect: (connectCallback) => connectCallback(null, new results.Connected()),
disconnect: (disconnectCallback) => this._fsm.transition('disconnecting', null, disconnectCallback),
sendEvent: (amqpMessage, sendCallback) => {
amqpMessage.to = this._d2cEndpoint;
/*Codes_SRS_NODE_DEVICE_AMQP_16_025: [The `sendEvent` method shall create and attach the d2c link if necessary.]*/
/*Codes_SRS_NODE_DEVICE_AMQP_18_006: [The `sendOutputEvent` method shall create and attach the d2c link if necessary.]*/
if (this._d2cLink) {
debug('using existing d2c link');
this._d2cLink.send(amqpMessage, handleResult('AMQP Transport: Could not send', sendCallback));
} else {
debug('waiting for a D2C link');
this._amqpLinkEmitter.once('senderLinkAttached', (err) => {
if (err) {
handleResult('AMQP Transport: Could not attach sender link', sendCallback)(err);
} else {
this._d2cLink.send(amqpMessage, handleResult('AMQP Transport: Could not send', sendCallback));
}
});
// If we were the first listener for the senderLinkAttached event, we should create a sender link
// If we are not the first listener, we know that attachSenderLink was already called and we shouldn't call it again.
// Doing so would create unnecessary sender links.
if (this._amqpLinkEmitter.listenerCount('senderLinkAttached') === 1) {
debug('attaching D2C link');
this._amqp.attachSenderLink(this._d2cEndpoint, null, (err, link) => {
if (err) {
debugErrors('error creating a D2C link: ' + getErrorName(err));
} else {
debug('got a new D2C link');
this._d2cLink = link;
this._d2cLink.on('error', this._d2cErrorListener);
}
this._amqpLinkEmitter.emit('senderLinkAttached', err);
});
}
}
},
updateSharedAccessSignature: (sharedAccessSignature, updateSasCallback) => {
/*Codes_SRS_NODE_DEVICE_AMQP_06_010: [If the AMQP connection is established, the `updateSharedAccessSignature` method shall call the amqp transport `putToken` method with the first parameter `audience`, created from the `sr` of the shared access signature, the actual shared access signature, and a callback.]*/
this._amqp.putToken(SharedAccessSignature.parse(sharedAccessSignature, ['sr', 'sig', 'se']).sr, sharedAccessSignature, (err) => {
if (err) {
this._amqp.disconnect(() => {
updateSasCallback(getTranslatedError(err, 'AMQP Transport: Could not authorize with puttoken'));
});
} else {
/*Codes_SRS_NODE_DEVICE_AMQP_06_011: [The `updateSharedAccessSignature` method shall call the `done` callback with a null error object and a SharedAccessSignatureUpdated object as a result, indicating the client does NOT need to reestablish the transport connection.]*/
updateSasCallback(null, new results.SharedAccessSignatureUpdated(false));
}
});
},
/*Codes_SRS_NODE_DEVICE_AMQP_16_062: [The `getTwin` method shall call the `getTwin` method on the `AmqpTwinClient` instance created by the constructor.]*/
/*Codes_SRS_NODE_DEVICE_AMQP_16_063: [The `getTwin` method shall call its callback with and error if the call to `AmqpTwinClient.getTwin` fails.]*/
/*Codes_SRS_NODE_DEVICE_AMQP_16_064: [The `getTwin` method shall call its callback with a `null` error parameter and the result of the `AmqpTwinClient.getTwin` method if it succeeds.]*/
getTwin: (callback) => this._twinClient.getTwin(handleResult('could not get twin', callback)),
/*Codes_SRS_NODE_DEVICE_AMQP_16_068: [The `updateTwinReportedProperties` method shall call the `updateTwinReportedProperties` method on the `AmqpTwinClient` instance created by the constructor.]*/
/*Codes_SRS_NODE_DEVICE_AMQP_16_069: [The `updateTwinReportedProperties` method shall call its callback with and error if the call to `AmqpTwinClient.updateTwinReportedProperties` fails.]*/
/*Codes_SRS_NODE_DEVICE_AMQP_16_070: [The `updateTwinReportedProperties` method shall call its callback with a `null` error parameter and the result of the `AmqpTwinClient.updateTwinReportedProperties` method if it succeeds.]*/
updateTwinReportedProperties: (patch, callback) => this._twinClient.updateTwinReportedProperties(patch, handleResult('could not update twin reported properties', callback)),
/*Codes_SRS_NODE_DEVICE_AMQP_16_074: [The `enableTwinDesiredPropertiesUpdates` method shall call the `enableTwinDesiredPropertiesUpdates` method on the `AmqpTwinClient` instance created by the constructor.]*/
/*Codes_SRS_NODE_DEVICE_AMQP_16_075: [The `enableTwinDesiredPropertiesUpdates` method shall call its callback with and error if the call to `AmqpTwinClient.enableTwinDesiredPropertiesUpdates` fails.]*/
/*Codes_SRS_NODE_DEVICE_AMQP_16_076: [The `enableTwinDesiredPropertiesUpdates` method shall call its callback with no arguments if the call to `AmqpTwinClient.enableTwinDesiredPropertiesUpdates` succeeds.]*/
enableTwinDesiredPropertiesUpdates: (callback) => this._twinClient.enableTwinDesiredPropertiesUpdates(handleResult('could not enable twin desired properties updates', callback)),
/*Codes_SRS_NODE_DEVICE_AMQP_16_077: [The `disableTwinDesiredPropertiesUpdates` method shall call the `disableTwinDesiredPropertiesUpdates` method on the `AmqpTwinClient` instance created by the constructor.]*/
/*Codes_SRS_NODE_DEVICE_AMQP_16_078: [The `disableTwinDesiredPropertiesUpdates` method shall call its callback with and error if the call to `AmqpTwinClient.disableTwinDesiredPropertiesUpdates` fails.]*/
/*Codes_SRS_NODE_DEVICE_AMQP_16_079: [The `disableTwinDesiredPropertiesUpdates` method shall call its callback no arguments if the call to `AmqpTwinClient.disableTwinDesiredPropertiesUpdates` succeeds.]*/
disableTwinDesiredPropertiesUpdates: (callback) => this._twinClient.disableTwinDesiredPropertiesUpdates(handleResult('could not disable twin desired properties updates', callback)),
enableC2D: (callback) => {
/*Codes_SRS_NODE_DEVICE_AMQP_41_003: [The `enableC2D` method shall attach the C2D link only if it is not already attached.] */
if (this._c2dLink) {
debug('C2D link already attached, doing nothing....');
process.nextTick(callback);
} else {
debug('waiting for a C2D link');
this._amqpLinkEmitter.once('receiverLinkAttached', (err) => {
if (err) {
/*Codes_SRS_NODE_DEVICE_AMQP_16_033: [The `enableC2D` method shall call its `callback` with an `Error` if the transport fails to connect, authenticate or attach link.]*/
handleResult('AMQP Transport: Could not attach receiver link', callback)(err);
} else {
callback();
}
});
// If we were the first listener for the receiverLinkAttached event, we should create a receiver link
// If we are not the first listener, we know that attachReceiverLink was already called and we shouldn't call it again.
// Doing so would create unnecessary receiver links.
if (this._amqpLinkEmitter.listenerCount('receiverLinkAttached') === 1) {
debug('attaching C2D link');
this._amqp.attachReceiverLink(this._c2dEndpoint, null, (err, receiverLink) => {
if (err) {
debugErrors('error creating a C2D link: ' + getErrorName(err));
} else {
/*Codes_SRS_NODE_DEVICE_AMQP_16_032: [The `enableC2D` method shall attach the C2D link and call its `callback` once it is successfully attached.]*/
debug('C2D link created and attached successfully');
this._c2dLink = receiverLink;
this._c2dLink.on('error', this._c2dErrorListener);
this._c2dLink.on('message', this._c2dMessageListener);
}
this._amqpLinkEmitter.emit('receiverLinkAttached', err);
});
}
}
},
disableC2D: (callback) => {
/*Codes_SRS_NODE_DEVICE_AMQP_41_004: [The `disableC2D` method shall detach the C2D link only if it is already attached.] */
if (this._c2dLink) {
/*Codes_SRS_NODE_DEVICE_AMQP_16_035: [The `disableC2D` method shall call `detach` on the C2D link and call its callback when it is successfully detached.]*/
/*Codes_SRS_NODE_DEVICE_AMQP_16_036: [The `disableC2D` method shall call its `callback` with an `Error` if it fails to detach the C2D link.]*/
this._stopC2DListener(undefined, callback);
} else {
debug('C2D link already detached, doing nothing...');
callback();
}
},
enableMethods: (callback) => {
// deviceMethodClient already checks if the link is attached or not, so we do not need to check that here like we do with C2D
/*Codes_SRS_NODE_DEVICE_AMQP_16_039: [The `enableMethods` method shall attach the method links and call its `callback` once these are successfully attached.]*/
/*Codes_SRS_NODE_DEVICE_AMQP_16_040: [The `enableMethods` method shall call its `callback` with an `Error` if the transport fails to connect, authenticate or attach method links.]*/
this._deviceMethodClient.attach(callback);
},
disableMethods: (callback) => {
// deviceMethodClient already checks if the link is attached or not, so we do not need to check that here like we do with C2D
/*Codes_SRS_NODE_DEVICE_AMQP_16_042: [The `disableMethods` method shall call `detach` on the device method links and call its callback when these are successfully detached.]*/
/*Codes_SRS_NODE_DEVICE_AMQP_16_043: [The `disableMethods` method shall call its `callback` with an `Error` if it fails to detach the device method links.]*/
this._deviceMethodClient.detach(callback);
},
/*Codes_SRS_NODE_DEVICE_AMQP_16_082: [if the handler specified in the `setDisconnectHandler` call is called while the `Amqp` object is connected, the connection shall be disconnected and an `disconnect` event shall be emitted with the error translated to a transport-agnostic error.]*/
amqpConnectionClosed: (err, callback) => this._fsm.transition('disconnecting', err, callback)
},
disconnecting: {
_onEnter: (err, disconnectCallback) => {
let finalError = err;
async.series([
(callback) => {
if (err) {
debugErrors('force-detaching device methods links because: ' + getErrorName(err));
this._deviceMethodClient.forceDetach();
callback();
} else {
this._deviceMethodClient.detach((detachErr) => {
if (detachErr) {
debugErrors('error detaching methods links: ' + detachErr);
if (!finalError) {
finalError = translateError('error while detaching the methods links when disconnecting', detachErr);
}
} else {
debug('device methods links detached.');
}
callback();
});
}
},
(callback) => {
this._twinClient.detach((twinDetachError) => {
if (twinDetachError) {
debugErrors('error detaching twin links: ' + twinDetachError);
if (!finalError) {
finalError = translateError('error while detaching twin links', twinDetachError);
}
} else {
debug('device twin links detached');
}
callback();
});
},
(callback) => {
this._amqpLinkEmitter.removeAllListeners('senderLinkAttached');
if (this._d2cLink) {
const tmpD2CLink = this._d2cLink;
this._d2cLink = undefined;
if (err) {
/*Codes_SRS_NODE_DEVICE_AMQP_16_023: [The `disconnect` method shall forcefully detach all attached links if a connection error is the causing the transport to be disconnected.]*/
tmpD2CLink.forceDetach(err);
tmpD2CLink.removeListener('error', this._d2cErrorListener);
}
tmpD2CLink.detach((detachErr) => {
if (detachErr) {
debugErrors('error detaching the D2C link: ' + detachErr);
} else {
debug('D2C link detached');
}
tmpD2CLink.removeListener('error', this._d2cErrorListener);
if (!finalError && detachErr) {
finalError = translateError('error while detaching the D2C link when disconnecting', detachErr);
}
callback();
});
} else {
callback();
}
},
(callback) => {
this._amqpLinkEmitter.removeAllListeners('receiverLinkAttached');
if (this._c2dLink) {
/*Codes_SRS_NODE_DEVICE_AMQP_16_022: [The `disconnect` method shall detach all attached links.]*/
this._stopC2DListener(err, (detachErr) => {
if (!finalError && detachErr) {
finalError = translateError('error while detaching the D2C link when disconnecting', detachErr);
}
callback();
});
} else {
callback();
}
},
(callback) => {
this._amqp.disconnect((disconnectErr) => {
if (disconnectErr) {
debugErrors('error disconnecting the AMQP connection: ' + disconnectErr);
} else {
debug('amqp connection successfully disconnected.');
}
if (!finalError && disconnectErr) {
finalError = translateError('error while disconnecting the AMQP connection', disconnectErr);
}
callback();
});
}
], () => {
/*Codes_SRS_NODE_DEVICE_AMQP_16_010: [The `done` callback method passed in argument shall be called when disconnected.]*/
/*Codes_SRS_NODE_DEVICE_AMQP_16_011: [The `done` callback method passed in argument shall be called with an error object if disconnecting fails.]*/
this._fsm.transition('disconnected', finalError, disconnectCallback);
});
},
'*': (_connectCallback) => this._fsm.deferUntilTransition()
},
}
});
this._fsm.on('transition', (data) => {
debug(data.fromState + ' -> ' + data.toState + ' (' + data.action + ')');
});
}