in device/transport/amqp/src/amqp_twin_client.ts [61:252]
constructor(authenticationProvider: AuthenticationProvider, client: any) {
super();
this._client = client;
this._authenticationProvider = authenticationProvider;
this._senderLink = null;
this._receiverLink = null;
this._pendingTwinRequests = {};
this._messageHandler = (message: AmqpMessage): void => {
//
// The ONLY time we should see a message on the receiver link without a correlationId is if the message is a desired property delta update.
//
const correlationId: string = message.correlation_id;
if (correlationId) {
this._onResponseMessage(message);
} else if (Object.prototype.hasOwnProperty.call(message, 'body')) {
this._onDesiredPropertyDelta(message);
} else {
//
// Can't be any message we know what to do with. Just drop it on the floor.
//
debug('malformed response message received from service: ' + JSON.stringify(message));
}
};
this._errorHandler = (err: Error): void => this._fsm.handle('handleLinkError', err);
this._fsm = new machina.Fsm({
namespace: 'amqp-twin-client',
initialState: 'detached',
states: {
detached: {
_onEnter: (err, detachCallback) => {
if (detachCallback) {
detachCallback(err);
} else {
if (err) {
this.emit('error', err);
}
}
},
getTwin: (callback) => {
/*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_007: [The `getTwin` method shall attach the sender link if it's not already attached.]*/
/*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_009: [THe `getTwin` method shall attach the receiver link if it's not already attached.]*/
this._fsm.transition('attaching', (err) => {
if (err) {
/*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_008: [If attaching the sender link fails, the `getTwin` method shall call its callback with the error that caused the failure.]*/
/*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_010: [If attaching the receiver link fails, the `getTwin` method shall call its callback with the error that caused the failure.]*/
callback(err);
} else {
this._fsm.handle('getTwin', callback);
}
});
},
updateTwinReportedProperties: (patch, callback) => {
/*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_015: [The `updateTwinReportedProperties` method shall attach the sender link if it's not already attached.]*/
/*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_017: [THe `updateTwinReportedProperties` method shall attach the receiver link if it's not already attached.]*/
this._fsm.transition('attaching', (err) => {
if (err) {
/*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_016: [If attaching the sender link fails, the `updateTwinReportedProperties` method shall call its callback with the error that caused the failure.]*/
/*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_018: [If attaching the receiver link fails, the `updateTwinReportedProperties` method shall call its callback with the error that caused the failure.]*/
callback(err);
} else {
this._fsm.handle('updateTwinReportedProperties', patch, callback);
}
});
},
enableTwinDesiredPropertiesUpdates: (callback) => {
/*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_023: [The `enableTwinDesiredPropertiesUpdates` method shall attach the sender link if it's not already attached.]*/
/*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_025: [The `enableTwinDesiredPropertiesUpdates` method shall attach the receiver link if it's not already attached.]*/
this._fsm.transition('attaching', (err) => {
if (err) {
/*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_024: [If attaching the sender link fails, the `enableTwinDesiredPropertiesUpdates` method shall call its callback with the error that caused the failure.]*/
/*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_026: [If attaching the receiver link fails, the `enableTwinDesiredPropertiesUpdates` method shall call its callback with the error that caused the failure.]*/
callback(err);
} else {
this._fsm.handle('enableTwinDesiredPropertiesUpdates', callback);
}
});
},
/*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_031: [The `disableTwinDesiredPropertiesUpdates` method shall call its callback immediately and with no arguments if the links are detached.]*/
disableTwinDesiredPropertiesUpdates: (callback) => callback(),
/*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_004: [The `detach` method shall call its `callback` immediately if the links are already detached.]*/
detach: (callback) => callback()
},
attaching: {
_onEnter: (attachCallback) => {
/*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_007: [The `attach` method shall call the `getDeviceCredentials` method on the `authenticationProvider` object passed as an argument to the constructor to retrieve the device id.]*/
this._authenticationProvider.getDeviceCredentials((err, credentials) => {
if (err) {
/*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_008: [The `attach` method shall call its callback with an error if the call to `getDeviceCredentials` fails with an error.]*/
this._fsm.transition('detached', err, attachCallback);
} else {
/*Codes_SRS_NODE_DEVICE_AMQP_TWIN_06_007: [The endpoint argument for attachReceiverLink shall be `/device/<deviceId>/twin`.] */
/*Codes_SRS_NODE_DEVICE_AMQP_TWIN_18_001: [If a `moduleId` value was set in the device's connection string, the endpoint argument for `attachReceiverLink` shall be `/devices/<deviceId>/modules/<moduleId>/twin`]*/
/*Codes_SRS_NODE_DEVICE_AMQP_TWIN_06_009: [The endpoint argument for attachSenderLink shall be `/device/<deviceId>/twin`.] */
/*Codes_SRS_NODE_DEVICE_AMQP_TWIN_18_002: [If a `moduleId` value was set in the device's connection string, the endpoint argument for `attachSenderLink` shall be `/device/<deviceId>/modules/<moduleId>/twin`.]*/
if (credentials.moduleId) {
this._endpoint = endpoint.moduleTwinPath(credentials.deviceId, credentials.moduleId);
} else {
this._endpoint = endpoint.deviceTwinPath(credentials.deviceId);
}
/*Codes_SRS_NODE_DEVICE_AMQP_TWIN_06_006: [When a listener is added for the `response` event, and the `post` event is NOT already subscribed, upstream and downstream links are established via calls to `attachReceiverLink` and `attachSenderLink`.] */
/*Codes_SRS_NODE_DEVICE_AMQP_TWIN_06_012: [When a listener is added for the `post` event, and the `response` event is NOT already subscribed, upstream and downstream links are established via calls to `attachReceiverLink` and `attachSenderLine`.] */
/*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_036: [The same correlationId shall be used for both the sender and receiver links.]*/
const linkCorrelationId: string = uuid.v4().toString();
this._client.attachSenderLink( this._endpoint, this._generateTwinLinkProperties(linkCorrelationId), (senderLinkError?: Error, senderTransportObject?: any): void => {
if (senderLinkError) {
/* Codes_SRS_NODE_DEVICE_AMQP_TWIN_06_022: [If an error occurs on establishing the upstream or downstream link then the `error` event shall be emitted.] */
this._fsm.transition('detached', senderLinkError, attachCallback);
} else {
this._senderLink = senderTransportObject;
this._senderLink.on('error', this._errorHandler);
this._client.attachReceiverLink( this._endpoint, this._generateTwinLinkProperties(linkCorrelationId, true), (receiverLinkError?: Error, receiverTransportObject?: any): void => {
if (receiverLinkError) {
this._fsm.transition('detached', receiverLinkError, attachCallback);
} else {
this._receiverLink = receiverTransportObject;
this._receiverLink.on('message', this._messageHandler);
this._receiverLink.on('error', this._errorHandler);
this._fsm.transition('attached', attachCallback);
}
});
}
});
}
});
},
handleLinkError: (err, callback) => this._fsm.transition('detaching', err, callback),
detach: (callback) => this._fsm.transition('detaching', null, callback),
'*': () => this._fsm.deferUntilTransition()
},
attached: {
_onEnter: (callback) => {
callback();
},
handleLinkError: (err) => {
this._fsm.transition('detaching', err);
},
/*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_011: [** The `getTwin` method shall send an `AmqpMessage` using the `SenderLink.send` method with the following annotations and properties:
- `operation` annotation set to `GET`.
- `resource` annotation set to `undefined`
- `correlationId` property set to a uuid
- `body` set to ` `.]*/
getTwin: (callback) => this._sendTwinRequest(TwinMethod.GET, undefined, ' ', callback),
/*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_019: [The `updateTwinReportedProperties` method shall send an `AmqpMessage` using the `SenderLink.send` method with the following annotations and properties:
- `operation` annotation set to `PATCH`.
- `resource` annotation set to `/properties/reported`
- `correlationId` property set to a uuid
- `body` set to the stringified patch object.]*/
updateTwinReportedProperties: (patch, callback) => this._sendTwinRequest(TwinMethod.PATCH, '/properties/reported', JSON.stringify(patch), callback),
/*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_027: [The `enableTwinDesiredPropertiesUpdates` method shall send an `AmqpMessage` using the `SenderLink.send` method with the following annotations and properties:
- `operation` annotation set to `PUT`.
- `resource` annotation set to `/notifications/twin/properties/desired`
- `correlationId` property set to a uuid
- `body` set to `undefined`.]*/
enableTwinDesiredPropertiesUpdates: (callback) => this._sendTwinRequest(TwinMethod.PUT, '/notifications/twin/properties/desired', ' ', callback),
/*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_032: [The `disableTwinDesiredPropertiesUpdates` method shall send an `AmqpMessage` using the `SenderLink.send` method with the following annotations and properties:
- `operation` annotation set to `DELETE`.
- `resource` annotation set to `/notifications/twin/properties/desired`
- `correlationId` property set to a uuid
- `body` set to `undefined`.]*/
disableTwinDesiredPropertiesUpdates: (callback) => this._sendTwinRequest(TwinMethod.DELETE, '/notifications/twin/properties/desired', ' ', callback),
detach: (callback) => this._fsm.transition('detaching', null, callback)
},
detaching: {
_onEnter: (err, detachCallback) => {
const senderLink = this._senderLink;
const receiverLink = this._receiverLink;
/*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_005: [The `detach` method shall detach the links and call its `callback` with no arguments if the links are successfully detached.]*/
this._client.detachSenderLink(this._endpoint, (detachSenderError: Error, _result?: any) => {
senderLink.removeListener('error', this._errorHandler);
if (detachSenderError) {
debugErrors('we received an error for the detach of the upstream link during the disconnect. Moving on to the downstream link. Error=' + detachSenderError);
}
this._client.detachReceiverLink(this._endpoint, (detachReceiverError: Error, _result?: any) => {
receiverLink.removeListener('message', this._messageHandler);
receiverLink.removeListener('error', this._errorHandler);
if (detachReceiverError) {
debugErrors('we received an error for the detach of the downstream link during the disconnect. Error=' + detachReceiverError);
}
/*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_006: [The `detach` method shall call its `callback` with an `Error` if detaching either of the links fail.]*/
const possibleError = err || detachSenderError || detachReceiverError;
this._fsm.transition('detached', possibleError, detachCallback);
});
});
},
'*': () => this._fsm.deferUntilTransition()
}
}
});
}