in provisioning/transport/mqtt/src/mqtt.ts [49:217]
constructor(mqttBase?: MqttBase) {
super();
this._mqttBase = mqttBase || new MqttBase();
this._config.pollingInterval = ProvisioningDeviceConstants.defaultPollingInterval;
const responseHandler = (topic: string, payload: any) => {
let payloadString: string = payload.toString('ascii');
debug('message received on ' + topic);
debug('request payload is: ' + payloadString);
/* Codes_SRS_NODE_PROVISIONING_MQTT_18_010: [ When waiting for responses, `registrationRequest` shall watch for messages with a topic named $dps/registrations/res/<status>/?$rid=<rid>.] */
/* Codes_SRS_NODE_PROVISIONING_MQTT_18_024: [ When waiting for responses, `queryOperationStatus` shall watch for messages with a topic named $dps/registrations/res/<status>/?$rid=<rid>.] */
let match = topic.match(/^\$dps\/registrations\/res\/(.*)\/\?(.*)$/);
if (!!match && match.length === 3) {
let queryParameters = queryString.parse(match[2]);
if (queryParameters.$rid) {
let rid: string = queryParameters.$rid as string;
if (this._operations[rid]) {
let status: number = Number(match[1]);
let payloadJson: any = JSON.parse(payloadString);
let handler = this._operations[rid].handler;
let statusString = this._operations[rid].statusString;
let operationId = this._operations[rid].operationId;
let retryAfterInMilliseconds: number = this._config.pollingInterval;
const retryParameter = 'retry-after';
/* Codes_SRS_NODE_PROVISIONING_MQTT_06_005: [ If the response to the `queryOperationStatus` contains a query parameter of `retry-after` that value * 1000 shall be the value of `callback` `pollingInterval` argument, otherwise default.] */
/* Codes_SRS_NODE_PROVISIONING_MQTT_06_006: [ If the response to the `registrationRequest` contains a query parameter of `retry-after` that value * 1000 shall be the value of `callback` `pollingInterval` argument, otherwise default.] */
if (queryParameters[retryParameter]) {
retryAfterInMilliseconds = Number(queryParameters[retryParameter]) * 1000;
}
delete this._operations[rid];
if (status < 300) {
/* Codes_SRS_NODE_PROVISIONING_MQTT_18_013: [ When `registrationRequest` receives a successful response from the service, it shall call `callback` passing in null and the response.] */
/* Codes_SRS_NODE_PROVISIONING_MQTT_18_027: [ When `queryOperationStatus` receives a successful response from the service, it shall call `callback` passing in null and the response.] */
handler(null, payloadJson, retryAfterInMilliseconds);
} else if (status >= 429) {
/*Codes_SRS_NODE_PROVISIONING_MQTT_06_003: [ When `registrationRequest` receives a response with status >429, it shall invoke `callback` with a result object containing property `status` with a value `registering` and no `operationId` property.] */
/*Codes_SRS_NODE_PROVISIONING_MQTT_06_004: [ When `queryOperationStatus` receives a response with status >429, it shall invoke `callback` with a result object containing property `status` with a value `assigning` and `operationId` property with value of the passed to the request.] */
handler(null, {status: statusString, operationId: operationId}, retryAfterInMilliseconds);
} else {
/* Codes_SRS_NODE_PROVISIONING_MQTT_18_012: [ If `registrationRequest` receives a response with status >= 300 and <429, it shall consider the request failed and create an error using `translateError`.] */
/* Codes_SRS_NODE_PROVISIONING_MQTT_18_015: [ When `registrationRequest` receives an error from the service, it shall call `callback` passing in the error.] */
/* Codes_SRS_NODE_PROVISIONING_MQTT_18_026: [ If `queryOperationStatus` receives a response with status >= 300 and <429, it shall consider the query failed and create an error using `translateError`.] */
/* Codes_SRS_NODE_PROVISIONING_MQTT_18_029: [ When `queryOperationStatus` receives an error from the service, it shall call `callback` passing in the error.] */
handler(translateError('incoming message failure', status, payloadJson, { topic: topic, payload: payloadJson }));
}
} else {
debug('received an unknown request id: ' + rid + ' topic: ' + topic);
}
} else {
debug('received message with no request id. Topic is: ' + topic);
}
} else {
debug('received a topic string with insufficient content: ' + topic);
}
};
const errorHandler = (err: Error) => {
this._fsm.handle('disconnect', err);
};
this._mqttBase.on('message', responseHandler);
this._mqttBase.on('error', errorHandler);
this._fsm = new machina.Fsm({
namespace: 'provisioning-transport-mqtt',
initialState: 'disconnected',
states: {
disconnected: {
_onEnter: (err, callback) => {
if (callback) {
callback(err);
} else if (err) {
this.emit('error', err);
}
},
registrationRequest: (request, rid, callback) => {
this._operations[rid] = {handler: callback, statusString: 'registering'};
/* Codes_SRS_NODE_PROVISIONING_MQTT_18_002: [ If the transport is not connected, `registrationRequest` shall connect it and subscribe to the response topic.] */
this._fsm.handle('connect', request, (err) => {
if (err) {
callback(err);
} else {
this._fsm.handle('registrationRequest', request, rid, callback);
}
});
},
queryOperationStatus: (request, rid, operationId, callback) => {
this._operations[rid] = {handler: callback, statusString: 'assigning', operationId: operationId};
/* Codes_SRS_NODE_PROVISIONING_MQTT_18_016: [ If the transport is not connected, `queryOperationStatus` shall connect it and subscribe to the response topic.] */
this._fsm.handle('connect', request, (err) => {
if (err) {
callback(err);
} else {
this._fsm.handle('queryOperationStatus', request, rid, operationId, callback);
}
});
},
connect: (request, callback) => this._fsm.transition('connecting', request, callback),
/* Codes_SRS_NODE_PROVISIONING_MQTT_18_052: [ If `disconnect` is called while the transport is disconnected, it will call `callback` immediately. ] */
disconnect: (err, callback) => callback(err),
/* Codes_SRS_NODE_PROVISIONING_MQTT_18_030: [ If `cancel` is called while the transport is disconnected, it will call `callback` immediately.] */
cancel: (callback) => callback()
},
connecting: {
_onEnter: (request, callback) => {
this._connect(request, (err) => {
if (err) {
/* Codes_SRS_NODE_PROVISIONING_MQTT_18_051: [ If either `_mqttBase.connect` or `_mqttBase.subscribe` fails, `mqtt` will disconnect the transport. ] */
this._fsm.transition('disconnecting', err, callback);
} else {
this._fsm.transition('connected', null, request, null, callback);
}
});
},
cancel: (callback) => {
/* Codes_SRS_NODE_PROVISIONING_MQTT_18_062: [ If `cancel` is called while the transport is in the process of connecting, it shell disconnect transport and cancel the operation that initiated the connection. ] */
this._cancelAllOperations();
this._fsm.transition('disconnecting', null, callback);
},
disconnect: (err, callback) => {
/* Codes_SRS_NODE_PROVISIONING_MQTT_18_061: [ If `disconnect` is called while the transport is in the process of connecting, it shell disconnect connection and cancel the operation that initiated the connection. ] */
this._cancelAllOperations();
this._fsm.transition('disconnecting', err, callback);
},
'*': () => this._fsm.deferUntilTransition()
},
connected: {
_onEnter: (err, request, result, callback) => callback(err, result, request),
registrationRequest: (request, rid, callback) => {
this._operations[rid] = {handler: callback, statusString: 'registering'};
this._sendRegistrationRequest(request, rid, (err, result) => {
callback(err, result, request);
});
},
queryOperationStatus: (request, rid, operationId, callback) => {
this._operations[rid] = {handler: callback, statusString: 'assigning', operationId: operationId};
this._sendOperationStatusQuery(request, rid, operationId, (err, result) => {
callback(err, result, request);
});
},
cancel: (callback) => {
/* Codes_SRS_NODE_PROVISIONING_MQTT_18_032: [ If `cancel` is called while the transport is connected and idle, it will call `callback` immediately.] */
/* Codes_SRS_NODE_PROVISIONING_MQTT_18_033: [ If `cancel` is called while the transport is in the middle of a `registrationRequest` operation, it will stop listening for a response and cause `registrationRequest` call it's `callback` passing an `OperationCancelledError` error.] */
/* Codes_SRS_NODE_PROVISIONING_MQTT_18_034: [ If `cancel` is called while the transport is in the middle of a `queryOperationStatus` operation, it will stop listening for a response and cause `registrationRequest` call it's `callback` passing an `OperationCancelledError` error.] */
this._cancelAllOperations();
callback();
},
/* Codes_SRS_NODE_PROVISIONING_MQTT_18_054: [ If `disconnect` is called while the transport is connected and idle, it shall disconnect. ] */
/* Codes_SRS_NODE_PROVISIONING_MQTT_18_055: [ If `disconnect` is called while the transport is in the middle of a `registrationRequest` operation, it shall cancel the `registrationRequest` operation and disconnect the transport. ] */
/* Codes_SRS_NODE_PROVISIONING_MQTT_18_056: [ If `disconnect` is called while the transport is in the middle of a `queryOperationStatus` operation, it shall cancel the `queryOperationStatus` operation and disconnect the transport. ] */
disconnect: (err, callback) => {
this._cancelAllOperations();
this._fsm.transition('disconnecting', err, callback);
}
},
disconnecting: {
_onEnter: (err, callback) => {
this._disconnect((disconnectErr) => {
this._fsm.transition('disconnected', err || disconnectErr, callback);
});
},
'*': () => this._fsm.deferUntilTransition()
}
}
});
this._fsm.on('transition', (data) => debug('MQTT State Machine: ' + data.fromState + ' -> ' + data.toState + ' (' + data.action + ')'));
}