in device/transport/amqp/src/amqp_device_method_client.ts [43:222]
constructor(authenticationProvider: AuthenticationProvider, amqpClient: BaseAmqpClient) {
super();
/*Codes_SRS_NODE_AMQP_DEVICE_METHOD_CLIENT_16_003: [The `AmqpDeviceMethodClient` shall inherit from the `EventEmitter` class.]*/
this._authenticationProvider = authenticationProvider;
this._amqpClient = amqpClient;
this._fsm = new machina.Fsm({
namespace: 'amqp-device-method-client',
initialState: 'detached',
states: {
detached: {
_onEnter: (callback, err) => {
this._senderLink = undefined;
this._receiverLink = undefined;
if (callback) {
callback(err);
} else {
if (err) {
debugErrors('detached with error: ' + getErrorName(err));
/*Codes_SRS_NODE_AMQP_DEVICE_METHOD_CLIENT_16_015: [The `AmqpDeviceMethodClient` object shall forward any error received on a link to any listening client in an `error` event.]*/
this.emit('error', err);
}
}
},
attach: (callback) => {
this._fsm.transition('attaching', callback);
},
/*Codes_SRS_NODE_AMQP_DEVICE_METHOD_CLIENT_16_023: [The `detach` method shall call the callback with no arguments if the links are properly detached.]*/
detach: (callback) => callback(),
/*Codes_SRS_NODE_AMQP_DEVICE_METHOD_CLIENT_16_025: [The `forceDetach` method shall immediately return if all links are already detached.]*/
forceDetach: () => { return; },
/*Codes_SRS_NODE_AMQP_DEVICE_METHOD_CLIENT_16_026: [The `sendMethodResponse` shall fail with a `NotConnectedError` if it is called while the links are detached.]*/
sendMethodResponse: (_response, callback) => callback(new errors.NotConnectedError('Method Links were detached - the service already considers this method failed')),
/*Codes_SRS_NODE_AMQP_DEVICE_METHOD_CLIENT_16_006: [The `onDeviceMethod` method shall save the `callback` argument so that it is called when the corresponding method call is received.]*/
onDeviceMethod: (methodName, methodCallback) => {
debug('attaching callback for method: ' + methodName + 'while detached.');
this.on('method_' + methodName, methodCallback);
}
},
attaching: {
_onEnter: (attachCallback) => {
/*Codes_SRS_NODE_AMQP_DEVICE_METHOD_CLIENT_16_027: [The `attach` method shall call the `getDeviceCredentials` method on the `authenticationProvider` object passed as an argument to the constructor to retrieve the device id.]*/
this._authenticationProvider.getDeviceCredentials((err, credentials) => {
if (err) {
/*Codes_SRS_NODE_AMQP_DEVICE_METHOD_CLIENT_16_028: [The `attach` method shall call its callback with an error if the call to `getDeviceCredentials` fails with an error.]*/
this._fsm.transition('detached', attachCallback, err);
} else {
/*Codes_SRS_NODE_AMQP_DEVICE_METHOD_CLIENT_16_017: [The endpoint used to for the sender and receiver link shall be `/devices/<device-id>/methods/devicebound`.]*/
/*Codes_SRS_NODE_AMQP_DEVICE_METHOD_CLIENT_18_001: [If a `moduleId` value was set in the device's connection string, The endpoint used to for the sender and receiver link shall be `/devices/<deviceId>/modules/<moduleId>/methods/devicebound`.]*/
if (credentials.moduleId) {
this._methodEndpoint = endpoint.moduleMethodPath(credentials.deviceId, credentials.moduleId);
} else {
this._methodEndpoint = endpoint.deviceMethodPath(credentials.deviceId);
}
/*Codes_SRS_NODE_AMQP_DEVICE_METHOD_CLIENT_16_014: [** The `AmqpDeviceMethodClient` object shall set 2 properties of any AMQP link that it create:
- `com.microsoft:api-version` shall be set to the current API version in use.
- `com.microsoft:channel-correlation-id` shall be set to the string "methods:" followed by a guid.]*/
const linkOptions: LinkOption = {
properties: {
'com.microsoft:api-version': endpoint.apiVersion,
'com.microsoft:channel-correlation-id': 'methods:' + uuid.v4()
},
rcv_settle_mode: 0
};
/*Codes_SRS_NODE_AMQP_DEVICE_METHOD_CLIENT_16_019: [The `attach` method shall create a SenderLink and a ReceiverLink and attach them.]*/
this._amqpClient.attachSenderLink(this._methodEndpoint, linkOptions, (err, senderLink) => {
if (err) {
this._fsm.transition('detaching', attachCallback, err);
} else {
this._senderLink = senderLink;
linkOptions.autoaccept = true;
this._amqpClient.attachReceiverLink(this._methodEndpoint, linkOptions, (err, receiverLink) => {
if (err) {
this._fsm.transition('detaching', attachCallback, err);
} else {
this._receiverLink = receiverLink;
/*Codes_SRS_NODE_AMQP_DEVICE_METHOD_CLIENT_16_021: [The `attach` method shall subscribe to the `message` and `error` events on the `ReceiverLink` object associated with the method endpoint.]*/
this._receiverLink.on('message', (msg) => {
debug('got method request');
debug(JSON.stringify(msg, null, 2));
const methodName = msg.application_properties[methodMessagePropertyKeys.methodName];
//
// The rhea library will de-serialize an encoded uuid (0x98) as a 16 byte buffer.
//
const methodRequest: any = {};
methodRequest.methods = { methodName: methodName };
if (msg.body && msg.body.content) {
methodRequest.body = msg.body.content.toString();
}
if (((msg.correlation_id as any) instanceof Buffer) && (msg.correlation_id.length === 16)) {
methodRequest.requestId = rhea.uuid_to_string(msg.correlation_id);
} else {
methodRequest.requestId = msg.correlation_id;
}
debug(JSON.stringify(methodRequest, null, 2));
this.emit('method_' + methodName, methodRequest);
});
this._receiverLink.on('error', (err) => {
this._fsm.transition('detaching', undefined, err);
});
this._fsm.transition('attached', attachCallback);
}
});
}
});
}
});
},
forceDetach: () => {
/*Codes_SRS_NODE_AMQP_DEVICE_METHOD_CLIENT_16_024: [The `forceDetach` method shall forcefully detach all links.]*/
if (this._senderLink) {
this._senderLink.forceDetach();
}
if (this._receiverLink) {
this._receiverLink.forceDetach();
}
this._fsm.transition('detached');
},
detach: () => this._fsm.deferUntilTransition(),
/*Codes_SRS_NODE_AMQP_DEVICE_METHOD_CLIENT_16_026: [The `sendMethodResponse` shall fail with a `NotConnectedError` if it is called while the links are detached.]*/
sendMethodResponse: (_response, callback) => callback(new errors.NotConnectedError('Method Links were detached - the service already considers this method failed')),
/*Codes_SRS_NODE_AMQP_DEVICE_METHOD_CLIENT_16_006: [The `onDeviceMethod` method shall save the `callback` argument so that it is called when the corresponding method call is received.]*/
onDeviceMethod: (methodName, methodCallback) => {
debug('attaching callback for method: ' + methodName + 'while attaching.');
this.on('method_' + methodName, methodCallback);
}
},
attached: {
_onEnter: (attachCallback) => {
attachCallback();
},
sendMethodResponse: (response, callback) => {
const message = new Message(JSON.stringify(response.payload));
message.correlationId = response.requestId;
message.properties.add(methodMessagePropertyKeys.status, response.status);
this._amqpClient.send(message, this._methodEndpoint, undefined, callback);
},
/*Codes_SRS_NODE_AMQP_DEVICE_METHOD_CLIENT_16_006: [The `onDeviceMethod` method shall save the `callback` argument so that it is called when the corresponding method call is received.]*/
onDeviceMethod: (methodName, methodCallback) => {
debug('attaching callback for method: ' + methodName);
this.on('method_' + methodName, methodCallback);
},
/*Codes_SRS_NODE_AMQP_DEVICE_METHOD_CLIENT_16_020: [The `attach` method shall immediately call the callback if the links are already attached.]*/
attach: (callback) => callback(),
detach: (callback) => this._fsm.transition('detaching', callback),
forceDetach: () => {
/*Codes_SRS_NODE_AMQP_DEVICE_METHOD_CLIENT_16_024: [The `forceDetach` method shall forcefully detach all links.]*/
this._senderLink.forceDetach();
this._receiverLink.forceDetach();
this._fsm.transition('detached');
}
},
detaching: {
_onEnter: (forwardedCallback, err) => {
/*Codes_SRS_NODE_AMQP_DEVICE_METHOD_CLIENT_16_022: [The `detach` method shall detach both Sender and Receiver links.]*/
const links = [this._senderLink, this._receiverLink];
async.each(links, (link, callback) => {
if (link) {
link.detach(callback);
} else {
callback();
}
}, () => {
this._fsm.transition('detached', forwardedCallback, err);
});
},
'*': (_callback) => this._fsm.deferUntilTransition('detached')
}
}
});
this._fsm.on('transition', (transition) => {
debug(transition.fromState + ' -> ' + transition.toState + ' (action:' + transition.action + ')');
});
}