in device/transport/mqtt/src/mqtt.ts [54:412]
constructor(authenticationProvider: AuthenticationProvider, mqttBase?: any) {
super();
this._firstConnection = true;
this._authenticationProvider = authenticationProvider;
/*Codes_SRS_NODE_DEVICE_MQTT_16_071: [The constructor shall subscribe to the `newTokenAvailable` event of the `authenticationProvider` passed as an argument if it uses tokens for authentication.]*/
if (this._authenticationProvider.type === AuthenticationType.Token) {
(<any>this._authenticationProvider).on('newTokenAvailable', (newCredentials) => {
/*Codes_SRS_NODE_DEVICE_MQTT_16_072: [If the `newTokenAvailable` event is fired, the `Mqtt` object shall do nothing if it isn't connected.]*/
/*Codes_SRS_NODE_DEVICE_MQTT_16_073: [If the `newTokenAvailable` event is fired, the `Mqtt` object shall call `updateSharedAccessSignature` on the `mqttBase` object if it is connected.]*/
this._fsm.handle('updateSharedAccessSignature', newCredentials.sharedAccessSignature, (err) => {
/*Codes_SRS_NODE_DEVICE_MQTT_16_074: [If updating the shared access signature fails when the `newTokenAvailable` event is fired, the `Mqtt` state machine shall fire a `disconnect` event.]*/
if (err) {
this.emit('disconnect', err);
}
});
});
}
/* Codes_SRS_NODE_DEVICE_MQTT_18_025: [ If the Mqtt constructor receives a second parameter, it shall be used as a mqttBase in place of mqtt.js ]*/
if (mqttBase) {
this._mqtt = mqttBase;
} else {
this._mqtt = new MqttBase();
}
/* Codes_SRS_NODE_DEVICE_MQTT_18_026: When MqttTransport fires the close event, the Mqtt object shall emit a disconnect event */
this._mqtt.on('error', (err) => {
debug('on close');
this._fsm.handle('disconnect', () => {
this.emit('disconnect', err);
});
});
this._mqtt.on('message', this._dispatchMqttMessage.bind(this));
this._twinClient = new MqttTwinClient(this._mqtt);
/*Codes_SRS_NODE_DEVICE_MQTT_16_081: [The `Mqtt` constructor shall subscribe to the `MqttTwinClient` `twinDesiredPropertiesUpdates`.]*/
/*Codes_SRS_NODE_DEVICE_MQTT_16_082: [A `twinDesiredPropertiesUpdates` shall be emitted by the `Mqtt` object for each `twinDesiredPropertiesUpdates` event received from the `MqttTwinClient` with the same payload. **/
this._twinClient.on('twinDesiredPropertiesUpdate', (patch) => this.emit('twinDesiredPropertiesUpdate', patch));
this._fsm = new machina.Fsm({
initialState: 'disconnected',
states: {
disconnected: {
_onEnter: (disconnectedCallback, err, result) => {
/*Codes_SRS_NODE_DEVICE_MQTT_16_085: [Once the MQTT transport is disconnected and if it is using a token authentication provider, the `stop` method of the `AuthenticationProvider` object shall be called to stop any running timer.]*/
if (this._authenticationProvider.type === AuthenticationType.Token) {
(this._authenticationProvider as SharedAccessKeyAuthenticationProvider).stop();
}
if (disconnectedCallback) {
if (err) {
/*Codes_SRS_NODE_DEVICE_MQTT_16_019: [The `connect` method shall calls its callback with an `Error` that has been translated from the `MqttBase` error using the `translateError` method if it fails to establish a connection.]*/
disconnectedCallback(translateError(err));
} else {
disconnectedCallback(undefined, result);
}
} else {
/* Codes_SRS_NODE_DEVICE_MQTT_18_026: When MqttTransport fires the close event, the Mqtt object shall emit a disconnect event */
this.emit('disconnect', err);
}
},
/*Codes_SRS_NODE_DEVICE_MQTT_16_021: [The `disconnect` method shall call its callback immediately with a `null` argument and a `results.Disconnected` second argument if `MqttBase` is already disconnected.]*/
disconnect: (callback) => callback(null, new results.Disconnected()),
connect: (callback) => {
this._fsm.transition('connecting', callback);
},
sendEvent: (message, outputProps, sendEventCallback) => {
/*Codes_SRS_NODE_DEVICE_MQTT_16_023: [The `sendEvent` method shall connect the Mqtt connection if it is disconnected.]*/
/*Codes_SRS_NODE_DEVICE_MQTT_18_045: [The `sendOutputEvent` method shall connect the Mqtt connection if it is disconnected. ]*/
this._fsm.handle('connect', (err) => {
if (err) {
/*Codes_SRS_NODE_DEVICE_MQTT_16_024: [The `sendEvent` method shall call its callback with an `Error` that has been translated using the `translateError` method if the `MqttBase` object fails to establish a connection.]*/
/*Codes_SRS_NODE_DEVICE_MQTT_18_046: [The `sendOutputEvent` method shall call its callback with an `Error` that has been translated using the `translateError` method if the `MqttBase` object fails to establish a connection. ]*/
sendEventCallback(translateError(err));
} else {
this._fsm.handle('sendEvent', message, outputProps, sendEventCallback);
}
});
},
updateSharedAccessSignature: (_sharedAccessSignature, callback) => { callback(null, new results.SharedAccessSignatureUpdated(false)); },
sendMethodResponse: (_response, callback) => {
/*Codes_SRS_NODE_DEVICE_MQTT_16_034: [The `sendMethodResponse` method shall fail with a `NotConnectedError` if the `MqttBase` object is not connected.]*/
callback(new errors.NotConnectedError('device disconnected: the service already considers the method has failed'));
},
getTwin: (callback) => {
/*Codes_SRS_NODE_DEVICE_MQTT_16_075: [`getTwin` shall establish the MQTT connection by calling `connect` on the `MqttBase` object if it is disconnected.]*/
this._fsm.handle('connect', (err) => {
if (err) {
/*Codes_SRS_NODE_DEVICE_MQTT_16_076: [`getTwin` shall call its callback with an error if it fails to connect the transport]*/
callback(err);
} else {
this._fsm.handle('getTwin', callback);
}
});
},
updateTwinReportedProperties: (patch, callback) => {
/*Codes_SRS_NODE_DEVICE_MQTT_16_078: [`updateTwinReportedProperties` shall establish the MQTT connection by calling `connect` on the `MqttBase` object if it is disconnected.]*/
this._fsm.handle('connect', (err) => {
if (err) {
/*Codes_SRS_NODE_DEVICE_MQTT_16_079: [`updateTwinReportedProperties` shall call its callback with an error if it fails to connect the transport]*/
callback(err);
} else {
this._fsm.handle('updateTwinReportedProperties', patch, callback);
}
});
},
enableC2D: (callback) => {
/*Codes_SRS_NODE_DEVICE_MQTT_16_047: [`enableC2D` shall connect the MQTT connection if it is disconnected.]*/
this._fsm.handle('connect', (err) => {
if (err) {
/*Codes_SRS_NODE_DEVICE_MQTT_16_048: [`enableC2D` shall calls its callback with an `Error` object if it fails to connect.]*/
callback(err);
} else {
this._fsm.handle('enableC2D', callback);
}
});
},
enableMethods: (callback) => {
/*Codes_SRS_NODE_DEVICE_MQTT_16_038: [`enableMethods` shall connect the MQTT connection if it is disconnected.]*/
this._fsm.handle('connect', (err) => {
if (err) {
/*Codes_SRS_NODE_DEVICE_MQTT_16_039: [`enableMethods` shall calls its callback with an `Error` object if it fails to connect.]*/
callback(err);
} else {
this._fsm.handle('enableMethods', callback);
}
});
},
enableInputMessages: (callback) => {
/*Codes_SRS_NODE_DEVICE_MQTT_18_059: [ `enableInputMessages` shall connect the MQTT connection if it is disconnected. ]*/
this._fsm.handle('connect', (err) => {
if (err) {
/*Codes_SRS_NODE_DEVICE_MQTT_18_060: [ `enableInputMessages` shall calls its callback with an `Error` object if it fails to connect. ]*/
callback(err);
} else {
this._fsm.handle('enableInputMessages', callback);
}
});
},
disableC2D: (callback) => {
/*Codes_SRS_NODE_DEVICE_MQTT_16_041: [`disableC2D` shall call its callback immediately if the MQTT connection is already disconnected.]*/
callback();
},
disableMethods: (callback) => {
/*Codes_SRS_NODE_DEVICE_MQTT_16_044: [`disableMethods` shall call its callback immediately if the MQTT connection is already disconnected.]*/
callback();
},
enableTwinDesiredPropertiesUpdates: (callback) => {
/*Codes_SRS_NODE_DEVICE_MQTT_16_057: [`enableTwinDesiredPropertiesUpdates` shall connect the MQTT connection if it is disconnected.]*/
this._fsm.handle('connect', (err) => {
if (err) {
/*Codes_SRS_NODE_DEVICE_MQTT_16_058: [`enableTwinDesiredPropertiesUpdates` shall calls its callback with an `Error` object if it fails to connect.]*/
callback(err);
} else {
this._fsm.handle('enableTwinDesiredPropertiesUpdates', callback);
}
});
},
disableTwinDesiredPropertiesUpdates: (callback) => callback(),
disableInputMessages: (callback) => {
/*Codes_SRS_NODE_DEVICE_MQTT_18_064: [ `disableInputMessages` shall call its callback immediately if the MQTT connection is already disconnected. ]*/
callback();
},
},
connecting: {
_onEnter: (connectCallback) => {
/*Codes_SRS_NODE_DEVICE_MQTT_16_067: [The `connect` method shall call the `getDeviceCredentials` method of the `AuthenticationProvider` object passed to the constructor to obtain the credentials of the device.]*/
this._authenticationProvider.getDeviceCredentials((err, credentials) => {
if (err) {
/*Codes_SRS_NODE_DEVICE_MQTT_16_068: [The `connect` method shall call its callback with the error returned by `getDeviceCredentials` if it fails to return the device credentials.]*/
this._fsm.transition('disconnected', connectCallback, err);
} else {
this._configureEndpoints(credentials);
this._ensureAgentString(() => {
const baseConfig = this._getBaseTransportConfig(credentials);
this._mqtt.connect(baseConfig, (err, result) => {
debug('connect');
if (err) {
debugErrors('Connect error: ' + err);
if (this._firstConnection) {
/* Codes_SRS_NODE_DEVICE_MQTT_41_006: [The `connect` method shall call its callback with an `UnauthorizedError` returned by the primary call to `connect` in the base MQTT client.]*/
this._fsm.transition('disconnected', connectCallback, new Error('Failure on first connection (Not authorized): ' + err.message));
} else {
/* Codes_SRS_NODE_DEVICE_MQTT_41_007: [The `connect` method shall call its callback with the error returned by the non-primary call to `connect` in the base MQTT client.]*/
this._fsm.transition('disconnected', connectCallback, err);
}
} else {
this._firstConnection = false;
this._fsm.transition('connected', connectCallback, result);
}
});
});
}
});
},
disconnect: (disconnectCallback) => {
this._fsm.transition('disconnecting', disconnectCallback);
},
/*Codes_SRS_NODE_DEVICE_MQTT_16_025: [If `sendEvent` is called while `MqttBase` is establishing the connection, it shall wait until the connection is established and then send the event.]*/
/*Codes_SRS_NODE_DEVICE_MQTT_16_035: [If `sendEvent` is called while `MqttBase` is establishing the connection, and `MqttBase` fails to establish the connection, then sendEvent shall fail.]*/
/*Codes_SRS_NODE_DEVICE_MQTT_18_047: [If `sendOutputEvent` is called while `MqttBase` is establishing the connection, it shall wait until the connection is established and then send the event. ]*/
/*Codes_SRS_NODE_DEVICE_MQTT_18_048: [If `sendOutputEvent` is called while `MqttBase` is establishing the connection, and `MqttBase` fails to establish the connection, then sendEvent shall fail. ]*/
'*': () => this._fsm.deferUntilTransition()
},
connected: {
_onEnter: (connectedCallback, connectResult) => {
/*Codes_SRS_NODE_DEVICE_MQTT_41_016: [ The `connect` method shall emit `connected` once the transport is connected ]*/
this.emit('connected');
/*Codes_SRS_NODE_DEVICE_MQTT_16_020: [The `connect` method shall call its callback with a `null` error parameter and a `results.Connected` response if `MqttBase` successfully connects.]*/
if (connectedCallback) connectedCallback(null, new results.Connected(connectResult));
},
/*Codes_SRS_NODE_DEVICE_MQTT_16_018: [The `connect` method shall call its callback immediately if `MqttBase` is already connected.]*/
connect: (connectCallback) => connectCallback(null, new results.Connected()),
disconnect: (disconnectCallback) => {
this._fsm.transition('disconnecting', disconnectCallback);
},
sendEvent: (message, outputProps, sendEventCallback) => {
/*Codes_SRS_NODE_COMMON_MQTT_BASE_16_008: [The `sendEvent` method shall use a topic formatted using the following convention: `devices/<deviceId>/messages/events/`.]*/
let topic = this._getEventTopicFromMessage(message, outputProps);
if (outputProps) {
topic += '/';
}
// This will not catch all messages that exceed IoT Hub limits because properties contribute the size as well.
if ((message?.data?.length ?? 0) > 256 * 1024) {
sendEventCallback(new errors.MessageTooLargeError('Message size is greater than 256KiB'));
return;
}
/*Codes_SRS_NODE_COMMON_MQTT_BASE_16_010: [** The `sendEvent` method shall use QoS level of 1.]*/
this._mqtt.publish(topic, message.data, { qos: 1, retain: false }, (err, result) => {
if (err) {
/*Codes_SRS_NODE_DEVICE_MQTT_16_027: [The `sendEvent` method shall call its callback with an `Error` that has been translated using the `translateError` method if the `MqttBase` object fails to publish the message.]*/
/*Codes_SRS_NODE_DEVICE_MQTT_18_050: [The `sendOutputEvent` method shall call its callback with an `Error` that has been translated using the `translateError` method if the `MqttBase` object fails to publish the message. ]*/
sendEventCallback(translateError(err));
} else {
/*Codes_SRS_NODE_DEVICE_MQTT_41_004 [ The `sendEvent` method shall call its callback with a `MessageEnqueued` ]*/
/*Codes_SRS_NODE_DEVICE_MQTT_41_005 [ The `sendOutputEvent` method shall call its callback with a `MessageEnqueued` ]*/
sendEventCallback(null, new results.MessageEnqueued(result));
}
});
},
updateSharedAccessSignature: (sharedAccessSignature, callback) => {
/*Codes_SRS_NODE_DEVICE_MQTT_16_028: [The `updateSharedAccessSignature` method shall call the `updateSharedAccessSignature` method on the `MqttBase` object if it is connected.]*/
this._mqtt.updateSharedAccessSignature(sharedAccessSignature, (err) => {
if (err) {
/*Codes_SRS_NODE_DEVICE_MQTT_16_009: [The `updateSharedAccessSignature` method shall call the `done` method with an `Error` object if `MqttBase.updateSharedAccessSignature` fails.]*/
this._fsm.transition('disconnected', callback, err);
} else {
/*Codes_SRS_NODE_DEVICE_MQTT_16_010: [The `updateSharedAccessSignature` method shall call the `done` callback with a `null` error object and a `SharedAccessSignatureUpdated` object with its `needToReconnect` property set to `false`, if `MqttBase.updateSharedAccessSignature` succeeds.]*/
callback(null, new results.SharedAccessSignatureUpdated(false));
}
});
},
sendMethodResponse: (response, callback) => {
// Codes_SRS_NODE_DEVICE_MQTT_13_002: [ sendMethodResponse shall build an MQTT topic name in the format: $iothub/methods/res/<STATUS>/?$rid=<REQUEST ID>&<PROPERTIES> where <STATUS> is response.status. ]
// Codes_SRS_NODE_DEVICE_MQTT_13_003: [ sendMethodResponse shall build an MQTT topic name in the format: $iothub/methods/res/<STATUS>/?$rid=<REQUEST ID>&<PROPERTIES> where <REQUEST ID> is response.requestId. ]
// Codes_SRS_NODE_DEVICE_MQTT_13_004: [ sendMethodResponse shall build an MQTT topic name in the format: $iothub/methods/res/<STATUS>/?$rid=<REQUEST ID>&<PROPERTIES> where <PROPERTIES> is URL encoded. ]
const topicName = util.format(
TOPIC_RESPONSE_PUBLISH_FORMAT,
'methods',
response.status,
response.requestId
);
debug('sending response using topic: ' + topicName);
debug(JSON.stringify(response.payload));
// publish the response message
this._mqtt.publish(topicName, JSON.stringify(response.payload), { qos: 0, retain: false }, (err) => {
// Codes_SRS_NODE_DEVICE_MQTT_13_006: [ If the MQTT publish fails then an error shall be returned via the done callback's first parameter. ]
// Codes_SRS_NODE_DEVICE_MQTT_13_007: [ If the MQTT publish is successful then the done callback shall be invoked passing null for the first parameter. ]
callback(err ? translateError(err) : null);
});
},
/* Codes_SRS_NODE_DEVICE_MQTT_41_008: [`enableC2D` shall not subscribe multiple times if already subscribed.]*/
enableC2D: (callback) => {
if (this._topics.message && this._topics.message.subscribed) {
debug('already subscribed to `message`, doing nothing...');
callback();
} else {
this._setupSubscription(this._topics.message, 1, callback);
}
},
/* Codes_SRS_NODE_DEVICE_MQTT_41_009: [`enableMethods` shall not subscribe multiple times if already subscribed.]*/
enableMethods: (callback) => {
if (this._topics.method && this._topics.method.subscribed) {
debug('already subscribed to `method`, doing nothing...');
callback();
} else {
this._setupSubscription(this._topics.method, 0, callback);
}
},
/* Codes_SRS_NODE_DEVICE_MQTT_41_010: [`enableInputMessages` shall not subscribe multiple times if already subscribed.]*/
enableInputMessages: (callback) => {
if (this._topics.inputMessage && this._topics.inputMessage.subscribed) {
debug('already subscribed to `inputMessages`, doing nothing...');
callback();
} else {
this._setupSubscription(this._topics.inputMessage, 1, callback);
}
},
/* Codes_SRS_NODE_DEVICE_MQTT_41_011: [`disableC2D` shall unsubscribe from the topic for C2D messages only if it is currently subscribed.]*/
disableC2D: (callback) => {
if (this._topics.message && this._topics.message.subscribed) {
this._removeSubscription(this._topics.message, callback);
} else {
debug('not subscribed to `message`, so doing nothing...');
callback();
}
},
/* Codes_SRS_NODE_DEVICE_MQTT_41_012: [`disableMethods` shall unsubscribe from the topic for direct methods only if it is currently subscribed.]*/
disableMethods: (callback) => {
if (this._topics.method && this._topics.method.subscribed) {
this._removeSubscription(this._topics.method, callback);
} else {
debug('not subscribed to `method`, so doing nothing...');
callback();
}
},
/* Codes_SRS_NODE_DEVICE_MQTT_41_013: [`disableInputMessages` shall unsubscribe from the topic for inputMessages only if it is currently subscribed.]*/
disableInputMessages: (callback) => {
if (this._topics.inputMessage && this._topics.inputMessage.subscribed) {
this._removeSubscription(this._topics.inputMessage, callback);
} else {
debug('not subscribed to `method`, so doing nothing...');
callback();
}
},
/*Codes_SRS_NODE_DEVICE_MQTT_16_077: [`getTwin` shall call the `getTwin` method on the `MqttTwinClient` object and pass it its callback.]*/
getTwin: (callback) => this._twinClient.getTwin(callback),
/*Codes_SRS_NODE_DEVICE_MQTT_16_080: [`updateTwinReportedProperties` shall call the `updateTwinReportedProperties` method on the `MqttTwinClient` object and pass it its callback.]*/
updateTwinReportedProperties: (patch, callback) => this._twinClient.updateTwinReportedProperties(patch, callback),
/*Codes_SRS_NODE_DEVICE_MQTT_16_059: [`enableTwinDesiredPropertiesUpdates` shall call the `enableTwinDesiredPropertiesUpdates` on the `MqttTwinClient` object created by the constructor and pass it its callback.]*/
enableTwinDesiredPropertiesUpdates: (callback) => this._twinClient.enableTwinDesiredPropertiesUpdates(callback),
/*Codes_SRS_NODE_DEVICE_MQTT_16_083: [`disableTwinDesiredPropertiesUpdates` shall call the `disableTwinDesiredPropertiesUpdates` on the `MqttTwinClient` object created by the constructor and pass it its callback.]*/
disableTwinDesiredPropertiesUpdates: (callback) => this._twinClient.disableTwinDesiredPropertiesUpdates(callback),
},
disconnecting: {
_onEnter: (disconnectCallback, _err) => {
/*Codes_SRS_NODE_DEVICE_MQTT_16_001: [The `disconnect` method should call the `disconnect` method on `MqttBase`.]*/
/*Codes_SRS_NODE_DEVICE_MQTT_16_022: [The `disconnect` method shall call its callback with a `null` error parameter and a `results.Disconnected` response if `MqttBase` successfully disconnects if not disconnected already.]*/
this._mqtt.disconnect((err, result) => {
this._fsm.transition('disconnected', disconnectCallback, err, new results.Disconnected(result));
});
},
/*Codes_SRS_NODE_DEVICE_MQTT_16_026: [If `sendEvent` is called while `MqttBase` is disconnecting, it shall wait until the disconnection is complete and then try to connect again and send the event. ]*/
/*Codes_SRS_NODE_DEVICE_MQTT_18_049: [If `sendOutputEvent` is called while `MqttBase` is disconnecting, it shall wait until the disconnection is complete and then try to connect again and send the event. ]*/
'*': () => this._fsm.deferUntilTransition()
}
}
});
this._fsm.on('transition', (data) => {
debug(data.fromState + ' -> ' + data.toState + ' (' + data.action + ')');
});
}