device/transport/mqtt/src/mqtt.ts (690 lines of code) (raw):

// Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. 'use strict'; import * as querystring from 'querystring'; import * as URL from 'url'; import * as machina from 'machina'; import { endpoint, results, errors, Message, AuthenticationProvider, AuthenticationType, TransportConfig } from 'azure-iot-common'; import { MethodMessage, DeviceMethodResponse, DeviceTransport, DeviceClientOptions, TwinProperties, SharedAccessKeyAuthenticationProvider } from 'azure-iot-device'; import { X509AuthenticationProvider, SharedAccessSignatureAuthenticationProvider } from 'azure-iot-device'; import { getUserAgentString } from 'azure-iot-device'; import { EventEmitter } from 'events'; import * as util from 'util'; import * as dbg from 'debug'; const debug = dbg('azure-iot-device-mqtt:Mqtt'); const debugErrors = dbg('azure-iot-device-mqtt:Mqtt:Errors'); import { MqttBaseTransportConfig, MqttBase, translateError } from 'azure-iot-mqtt-base'; import { MqttTwinClient } from './mqtt_twin_client'; const TOPIC_RESPONSE_PUBLISH_FORMAT = '$iothub/%s/res/%d/?$rid=%s'; /** * @class module:azure-iot-device-mqtt.Mqtt * @classdesc Provides MQTT transport for the device [client]{@link module:azure-iot-device.Client} class. * * This class is not meant to be used directly, instead it should just be passed to the [client]{@link module:azure-iot-device.Client} object. * * @param {Object} config Configuration object derived from the connection string by the client. */ /* Codes_SRS_NODE_DEVICE_MQTT_12_001: [The `Mqtt` constructor shall accept the transport configuration structure Codes_SRS_NODE_DEVICE_MQTT_12_002: [The `Mqtt` constructor shall store the configuration structure in a member variable Codes_SRS_NODE_DEVICE_MQTT_12_003: [The Mqtt constructor shall create an base transport object and store it in a member variable.] */ export class Mqtt extends EventEmitter implements DeviceTransport { /** * @private */ protected _authenticationProvider: AuthenticationProvider; private _mqtt: MqttBase; private _twinClient: MqttTwinClient; private _topicTelemetryPublish: string; private _fsm: machina.Fsm; private _topics: { [key: string]: TopicDescription }; private _userAgentString: string; private _productInfo: string; private _firstConnection: boolean; private _mid: string = ''; /** * @private */ constructor(authenticationProvider: AuthenticationProvider, mqttBase?: any) { super(); this._firstConnection = true; this._authenticationProvider = authenticationProvider; /*Codes_SRS_NODE_DEVICE_MQTT_16_071: [The constructor shall subscribe to the `newTokenAvailable` event of the `authenticationProvider` passed as an argument if it uses tokens for authentication.]*/ if (this._authenticationProvider.type === AuthenticationType.Token) { (<any>this._authenticationProvider).on('newTokenAvailable', (newCredentials) => { /*Codes_SRS_NODE_DEVICE_MQTT_16_072: [If the `newTokenAvailable` event is fired, the `Mqtt` object shall do nothing if it isn't connected.]*/ /*Codes_SRS_NODE_DEVICE_MQTT_16_073: [If the `newTokenAvailable` event is fired, the `Mqtt` object shall call `updateSharedAccessSignature` on the `mqttBase` object if it is connected.]*/ this._fsm.handle('updateSharedAccessSignature', newCredentials.sharedAccessSignature, (err) => { /*Codes_SRS_NODE_DEVICE_MQTT_16_074: [If updating the shared access signature fails when the `newTokenAvailable` event is fired, the `Mqtt` state machine shall fire a `disconnect` event.]*/ if (err) { this.emit('disconnect', err); } }); }); } /* Codes_SRS_NODE_DEVICE_MQTT_18_025: [ If the Mqtt constructor receives a second parameter, it shall be used as a mqttBase in place of mqtt.js ]*/ if (mqttBase) { this._mqtt = mqttBase; } else { this._mqtt = new MqttBase(); } /* Codes_SRS_NODE_DEVICE_MQTT_18_026: When MqttTransport fires the close event, the Mqtt object shall emit a disconnect event */ this._mqtt.on('error', (err) => { debug('on close'); this._fsm.handle('disconnect', () => { this.emit('disconnect', err); }); }); this._mqtt.on('message', this._dispatchMqttMessage.bind(this)); this._twinClient = new MqttTwinClient(this._mqtt); /*Codes_SRS_NODE_DEVICE_MQTT_16_081: [The `Mqtt` constructor shall subscribe to the `MqttTwinClient` `twinDesiredPropertiesUpdates`.]*/ /*Codes_SRS_NODE_DEVICE_MQTT_16_082: [A `twinDesiredPropertiesUpdates` shall be emitted by the `Mqtt` object for each `twinDesiredPropertiesUpdates` event received from the `MqttTwinClient` with the same payload. **/ this._twinClient.on('twinDesiredPropertiesUpdate', (patch) => this.emit('twinDesiredPropertiesUpdate', patch)); this._fsm = new machina.Fsm({ initialState: 'disconnected', states: { disconnected: { _onEnter: (disconnectedCallback, err, result) => { /*Codes_SRS_NODE_DEVICE_MQTT_16_085: [Once the MQTT transport is disconnected and if it is using a token authentication provider, the `stop` method of the `AuthenticationProvider` object shall be called to stop any running timer.]*/ if (this._authenticationProvider.type === AuthenticationType.Token) { (this._authenticationProvider as SharedAccessKeyAuthenticationProvider).stop(); } if (disconnectedCallback) { if (err) { /*Codes_SRS_NODE_DEVICE_MQTT_16_019: [The `connect` method shall calls its callback with an `Error` that has been translated from the `MqttBase` error using the `translateError` method if it fails to establish a connection.]*/ disconnectedCallback(translateError(err)); } else { disconnectedCallback(undefined, result); } } else { /* Codes_SRS_NODE_DEVICE_MQTT_18_026: When MqttTransport fires the close event, the Mqtt object shall emit a disconnect event */ this.emit('disconnect', err); } }, /*Codes_SRS_NODE_DEVICE_MQTT_16_021: [The `disconnect` method shall call its callback immediately with a `null` argument and a `results.Disconnected` second argument if `MqttBase` is already disconnected.]*/ disconnect: (callback) => callback(null, new results.Disconnected()), connect: (callback) => { this._fsm.transition('connecting', callback); }, sendEvent: (message, outputProps, sendEventCallback) => { /*Codes_SRS_NODE_DEVICE_MQTT_16_023: [The `sendEvent` method shall connect the Mqtt connection if it is disconnected.]*/ /*Codes_SRS_NODE_DEVICE_MQTT_18_045: [The `sendOutputEvent` method shall connect the Mqtt connection if it is disconnected. ]*/ this._fsm.handle('connect', (err) => { if (err) { /*Codes_SRS_NODE_DEVICE_MQTT_16_024: [The `sendEvent` method shall call its callback with an `Error` that has been translated using the `translateError` method if the `MqttBase` object fails to establish a connection.]*/ /*Codes_SRS_NODE_DEVICE_MQTT_18_046: [The `sendOutputEvent` method shall call its callback with an `Error` that has been translated using the `translateError` method if the `MqttBase` object fails to establish a connection. ]*/ sendEventCallback(translateError(err)); } else { this._fsm.handle('sendEvent', message, outputProps, sendEventCallback); } }); }, updateSharedAccessSignature: (_sharedAccessSignature, callback) => { callback(null, new results.SharedAccessSignatureUpdated(false)); }, sendMethodResponse: (_response, callback) => { /*Codes_SRS_NODE_DEVICE_MQTT_16_034: [The `sendMethodResponse` method shall fail with a `NotConnectedError` if the `MqttBase` object is not connected.]*/ callback(new errors.NotConnectedError('device disconnected: the service already considers the method has failed')); }, getTwin: (callback) => { /*Codes_SRS_NODE_DEVICE_MQTT_16_075: [`getTwin` shall establish the MQTT connection by calling `connect` on the `MqttBase` object if it is disconnected.]*/ this._fsm.handle('connect', (err) => { if (err) { /*Codes_SRS_NODE_DEVICE_MQTT_16_076: [`getTwin` shall call its callback with an error if it fails to connect the transport]*/ callback(err); } else { this._fsm.handle('getTwin', callback); } }); }, updateTwinReportedProperties: (patch, callback) => { /*Codes_SRS_NODE_DEVICE_MQTT_16_078: [`updateTwinReportedProperties` shall establish the MQTT connection by calling `connect` on the `MqttBase` object if it is disconnected.]*/ this._fsm.handle('connect', (err) => { if (err) { /*Codes_SRS_NODE_DEVICE_MQTT_16_079: [`updateTwinReportedProperties` shall call its callback with an error if it fails to connect the transport]*/ callback(err); } else { this._fsm.handle('updateTwinReportedProperties', patch, callback); } }); }, enableC2D: (callback) => { /*Codes_SRS_NODE_DEVICE_MQTT_16_047: [`enableC2D` shall connect the MQTT connection if it is disconnected.]*/ this._fsm.handle('connect', (err) => { if (err) { /*Codes_SRS_NODE_DEVICE_MQTT_16_048: [`enableC2D` shall calls its callback with an `Error` object if it fails to connect.]*/ callback(err); } else { this._fsm.handle('enableC2D', callback); } }); }, enableMethods: (callback) => { /*Codes_SRS_NODE_DEVICE_MQTT_16_038: [`enableMethods` shall connect the MQTT connection if it is disconnected.]*/ this._fsm.handle('connect', (err) => { if (err) { /*Codes_SRS_NODE_DEVICE_MQTT_16_039: [`enableMethods` shall calls its callback with an `Error` object if it fails to connect.]*/ callback(err); } else { this._fsm.handle('enableMethods', callback); } }); }, enableInputMessages: (callback) => { /*Codes_SRS_NODE_DEVICE_MQTT_18_059: [ `enableInputMessages` shall connect the MQTT connection if it is disconnected. ]*/ this._fsm.handle('connect', (err) => { if (err) { /*Codes_SRS_NODE_DEVICE_MQTT_18_060: [ `enableInputMessages` shall calls its callback with an `Error` object if it fails to connect. ]*/ callback(err); } else { this._fsm.handle('enableInputMessages', callback); } }); }, disableC2D: (callback) => { /*Codes_SRS_NODE_DEVICE_MQTT_16_041: [`disableC2D` shall call its callback immediately if the MQTT connection is already disconnected.]*/ callback(); }, disableMethods: (callback) => { /*Codes_SRS_NODE_DEVICE_MQTT_16_044: [`disableMethods` shall call its callback immediately if the MQTT connection is already disconnected.]*/ callback(); }, enableTwinDesiredPropertiesUpdates: (callback) => { /*Codes_SRS_NODE_DEVICE_MQTT_16_057: [`enableTwinDesiredPropertiesUpdates` shall connect the MQTT connection if it is disconnected.]*/ this._fsm.handle('connect', (err) => { if (err) { /*Codes_SRS_NODE_DEVICE_MQTT_16_058: [`enableTwinDesiredPropertiesUpdates` shall calls its callback with an `Error` object if it fails to connect.]*/ callback(err); } else { this._fsm.handle('enableTwinDesiredPropertiesUpdates', callback); } }); }, disableTwinDesiredPropertiesUpdates: (callback) => callback(), disableInputMessages: (callback) => { /*Codes_SRS_NODE_DEVICE_MQTT_18_064: [ `disableInputMessages` shall call its callback immediately if the MQTT connection is already disconnected. ]*/ callback(); }, }, connecting: { _onEnter: (connectCallback) => { /*Codes_SRS_NODE_DEVICE_MQTT_16_067: [The `connect` method shall call the `getDeviceCredentials` method of the `AuthenticationProvider` object passed to the constructor to obtain the credentials of the device.]*/ this._authenticationProvider.getDeviceCredentials((err, credentials) => { if (err) { /*Codes_SRS_NODE_DEVICE_MQTT_16_068: [The `connect` method shall call its callback with the error returned by `getDeviceCredentials` if it fails to return the device credentials.]*/ this._fsm.transition('disconnected', connectCallback, err); } else { this._configureEndpoints(credentials); this._ensureAgentString(() => { const baseConfig = this._getBaseTransportConfig(credentials); this._mqtt.connect(baseConfig, (err, result) => { debug('connect'); if (err) { debugErrors('Connect error: ' + err); if (this._firstConnection) { /* Codes_SRS_NODE_DEVICE_MQTT_41_006: [The `connect` method shall call its callback with an `UnauthorizedError` returned by the primary call to `connect` in the base MQTT client.]*/ this._fsm.transition('disconnected', connectCallback, new Error('Failure on first connection (Not authorized): ' + err.message)); } else { /* Codes_SRS_NODE_DEVICE_MQTT_41_007: [The `connect` method shall call its callback with the error returned by the non-primary call to `connect` in the base MQTT client.]*/ this._fsm.transition('disconnected', connectCallback, err); } } else { this._firstConnection = false; this._fsm.transition('connected', connectCallback, result); } }); }); } }); }, disconnect: (disconnectCallback) => { this._fsm.transition('disconnecting', disconnectCallback); }, /*Codes_SRS_NODE_DEVICE_MQTT_16_025: [If `sendEvent` is called while `MqttBase` is establishing the connection, it shall wait until the connection is established and then send the event.]*/ /*Codes_SRS_NODE_DEVICE_MQTT_16_035: [If `sendEvent` is called while `MqttBase` is establishing the connection, and `MqttBase` fails to establish the connection, then sendEvent shall fail.]*/ /*Codes_SRS_NODE_DEVICE_MQTT_18_047: [If `sendOutputEvent` is called while `MqttBase` is establishing the connection, it shall wait until the connection is established and then send the event. ]*/ /*Codes_SRS_NODE_DEVICE_MQTT_18_048: [If `sendOutputEvent` is called while `MqttBase` is establishing the connection, and `MqttBase` fails to establish the connection, then sendEvent shall fail. ]*/ '*': () => this._fsm.deferUntilTransition() }, connected: { _onEnter: (connectedCallback, connectResult) => { /*Codes_SRS_NODE_DEVICE_MQTT_41_016: [ The `connect` method shall emit `connected` once the transport is connected ]*/ this.emit('connected'); /*Codes_SRS_NODE_DEVICE_MQTT_16_020: [The `connect` method shall call its callback with a `null` error parameter and a `results.Connected` response if `MqttBase` successfully connects.]*/ if (connectedCallback) connectedCallback(null, new results.Connected(connectResult)); }, /*Codes_SRS_NODE_DEVICE_MQTT_16_018: [The `connect` method shall call its callback immediately if `MqttBase` is already connected.]*/ connect: (connectCallback) => connectCallback(null, new results.Connected()), disconnect: (disconnectCallback) => { this._fsm.transition('disconnecting', disconnectCallback); }, sendEvent: (message, outputProps, sendEventCallback) => { /*Codes_SRS_NODE_COMMON_MQTT_BASE_16_008: [The `sendEvent` method shall use a topic formatted using the following convention: `devices/<deviceId>/messages/events/`.]*/ let topic = this._getEventTopicFromMessage(message, outputProps); if (outputProps) { topic += '/'; } // This will not catch all messages that exceed IoT Hub limits because properties contribute the size as well. if ((message?.data?.length ?? 0) > 256 * 1024) { sendEventCallback(new errors.MessageTooLargeError('Message size is greater than 256KiB')); return; } /*Codes_SRS_NODE_COMMON_MQTT_BASE_16_010: [** The `sendEvent` method shall use QoS level of 1.]*/ this._mqtt.publish(topic, message.data, { qos: 1, retain: false }, (err, result) => { if (err) { /*Codes_SRS_NODE_DEVICE_MQTT_16_027: [The `sendEvent` method shall call its callback with an `Error` that has been translated using the `translateError` method if the `MqttBase` object fails to publish the message.]*/ /*Codes_SRS_NODE_DEVICE_MQTT_18_050: [The `sendOutputEvent` method shall call its callback with an `Error` that has been translated using the `translateError` method if the `MqttBase` object fails to publish the message. ]*/ sendEventCallback(translateError(err)); } else { /*Codes_SRS_NODE_DEVICE_MQTT_41_004 [ The `sendEvent` method shall call its callback with a `MessageEnqueued` ]*/ /*Codes_SRS_NODE_DEVICE_MQTT_41_005 [ The `sendOutputEvent` method shall call its callback with a `MessageEnqueued` ]*/ sendEventCallback(null, new results.MessageEnqueued(result)); } }); }, updateSharedAccessSignature: (sharedAccessSignature, callback) => { /*Codes_SRS_NODE_DEVICE_MQTT_16_028: [The `updateSharedAccessSignature` method shall call the `updateSharedAccessSignature` method on the `MqttBase` object if it is connected.]*/ this._mqtt.updateSharedAccessSignature(sharedAccessSignature, (err) => { if (err) { /*Codes_SRS_NODE_DEVICE_MQTT_16_009: [The `updateSharedAccessSignature` method shall call the `done` method with an `Error` object if `MqttBase.updateSharedAccessSignature` fails.]*/ this._fsm.transition('disconnected', callback, err); } else { /*Codes_SRS_NODE_DEVICE_MQTT_16_010: [The `updateSharedAccessSignature` method shall call the `done` callback with a `null` error object and a `SharedAccessSignatureUpdated` object with its `needToReconnect` property set to `false`, if `MqttBase.updateSharedAccessSignature` succeeds.]*/ callback(null, new results.SharedAccessSignatureUpdated(false)); } }); }, sendMethodResponse: (response, callback) => { // Codes_SRS_NODE_DEVICE_MQTT_13_002: [ sendMethodResponse shall build an MQTT topic name in the format: $iothub/methods/res/<STATUS>/?$rid=<REQUEST ID>&<PROPERTIES> where <STATUS> is response.status. ] // Codes_SRS_NODE_DEVICE_MQTT_13_003: [ sendMethodResponse shall build an MQTT topic name in the format: $iothub/methods/res/<STATUS>/?$rid=<REQUEST ID>&<PROPERTIES> where <REQUEST ID> is response.requestId. ] // Codes_SRS_NODE_DEVICE_MQTT_13_004: [ sendMethodResponse shall build an MQTT topic name in the format: $iothub/methods/res/<STATUS>/?$rid=<REQUEST ID>&<PROPERTIES> where <PROPERTIES> is URL encoded. ] const topicName = util.format( TOPIC_RESPONSE_PUBLISH_FORMAT, 'methods', response.status, response.requestId ); debug('sending response using topic: ' + topicName); debug(JSON.stringify(response.payload)); // publish the response message this._mqtt.publish(topicName, JSON.stringify(response.payload), { qos: 0, retain: false }, (err) => { // Codes_SRS_NODE_DEVICE_MQTT_13_006: [ If the MQTT publish fails then an error shall be returned via the done callback's first parameter. ] // Codes_SRS_NODE_DEVICE_MQTT_13_007: [ If the MQTT publish is successful then the done callback shall be invoked passing null for the first parameter. ] callback(err ? translateError(err) : null); }); }, /* Codes_SRS_NODE_DEVICE_MQTT_41_008: [`enableC2D` shall not subscribe multiple times if already subscribed.]*/ enableC2D: (callback) => { if (this._topics.message && this._topics.message.subscribed) { debug('already subscribed to `message`, doing nothing...'); callback(); } else { this._setupSubscription(this._topics.message, 1, callback); } }, /* Codes_SRS_NODE_DEVICE_MQTT_41_009: [`enableMethods` shall not subscribe multiple times if already subscribed.]*/ enableMethods: (callback) => { if (this._topics.method && this._topics.method.subscribed) { debug('already subscribed to `method`, doing nothing...'); callback(); } else { this._setupSubscription(this._topics.method, 0, callback); } }, /* Codes_SRS_NODE_DEVICE_MQTT_41_010: [`enableInputMessages` shall not subscribe multiple times if already subscribed.]*/ enableInputMessages: (callback) => { if (this._topics.inputMessage && this._topics.inputMessage.subscribed) { debug('already subscribed to `inputMessages`, doing nothing...'); callback(); } else { this._setupSubscription(this._topics.inputMessage, 1, callback); } }, /* Codes_SRS_NODE_DEVICE_MQTT_41_011: [`disableC2D` shall unsubscribe from the topic for C2D messages only if it is currently subscribed.]*/ disableC2D: (callback) => { if (this._topics.message && this._topics.message.subscribed) { this._removeSubscription(this._topics.message, callback); } else { debug('not subscribed to `message`, so doing nothing...'); callback(); } }, /* Codes_SRS_NODE_DEVICE_MQTT_41_012: [`disableMethods` shall unsubscribe from the topic for direct methods only if it is currently subscribed.]*/ disableMethods: (callback) => { if (this._topics.method && this._topics.method.subscribed) { this._removeSubscription(this._topics.method, callback); } else { debug('not subscribed to `method`, so doing nothing...'); callback(); } }, /* Codes_SRS_NODE_DEVICE_MQTT_41_013: [`disableInputMessages` shall unsubscribe from the topic for inputMessages only if it is currently subscribed.]*/ disableInputMessages: (callback) => { if (this._topics.inputMessage && this._topics.inputMessage.subscribed) { this._removeSubscription(this._topics.inputMessage, callback); } else { debug('not subscribed to `method`, so doing nothing...'); callback(); } }, /*Codes_SRS_NODE_DEVICE_MQTT_16_077: [`getTwin` shall call the `getTwin` method on the `MqttTwinClient` object and pass it its callback.]*/ getTwin: (callback) => this._twinClient.getTwin(callback), /*Codes_SRS_NODE_DEVICE_MQTT_16_080: [`updateTwinReportedProperties` shall call the `updateTwinReportedProperties` method on the `MqttTwinClient` object and pass it its callback.]*/ updateTwinReportedProperties: (patch, callback) => this._twinClient.updateTwinReportedProperties(patch, callback), /*Codes_SRS_NODE_DEVICE_MQTT_16_059: [`enableTwinDesiredPropertiesUpdates` shall call the `enableTwinDesiredPropertiesUpdates` on the `MqttTwinClient` object created by the constructor and pass it its callback.]*/ enableTwinDesiredPropertiesUpdates: (callback) => this._twinClient.enableTwinDesiredPropertiesUpdates(callback), /*Codes_SRS_NODE_DEVICE_MQTT_16_083: [`disableTwinDesiredPropertiesUpdates` shall call the `disableTwinDesiredPropertiesUpdates` on the `MqttTwinClient` object created by the constructor and pass it its callback.]*/ disableTwinDesiredPropertiesUpdates: (callback) => this._twinClient.disableTwinDesiredPropertiesUpdates(callback), }, disconnecting: { _onEnter: (disconnectCallback, _err) => { /*Codes_SRS_NODE_DEVICE_MQTT_16_001: [The `disconnect` method should call the `disconnect` method on `MqttBase`.]*/ /*Codes_SRS_NODE_DEVICE_MQTT_16_022: [The `disconnect` method shall call its callback with a `null` error parameter and a `results.Disconnected` response if `MqttBase` successfully disconnects if not disconnected already.]*/ this._mqtt.disconnect((err, result) => { this._fsm.transition('disconnected', disconnectCallback, err, new results.Disconnected(result)); }); }, /*Codes_SRS_NODE_DEVICE_MQTT_16_026: [If `sendEvent` is called while `MqttBase` is disconnecting, it shall wait until the disconnection is complete and then try to connect again and send the event. ]*/ /*Codes_SRS_NODE_DEVICE_MQTT_18_049: [If `sendOutputEvent` is called while `MqttBase` is disconnecting, it shall wait until the disconnection is complete and then try to connect again and send the event. ]*/ '*': () => this._fsm.deferUntilTransition() } } }); this._fsm.on('transition', (data) => { debug(data.fromState + ' -> ' + data.toState + ' (' + data.action + ')'); }); } /** * @private * @method module:azure-iot-device-mqtt.Mqtt#connect * @description Establishes the connection to the Azure IoT Hub instance using the MQTT protocol. * * @param {Function} done callback that shall be called when the connection is established. */ /* Codes_SRS_NODE_DEVICE_MQTT_12_004: [The connect method shall call the connect method on MqttTransport */ connect(done?: (err?: Error, result?: any) => void): void { this._fsm.handle('connect', done); } /** * @private * @method module:azure-iot-device-mqtt.Mqtt#disconnect * @description Terminates the connection to the IoT Hub instance. * * @param {Function} done Callback that shall be called when the connection is terminated. */ /* Codes_SRS_NODE_DEVICE_MQTT_16_001: [The disconnect method should call the disconnect method on MqttTransport.] */ disconnect(done?: (err?: Error, result?: any) => void): void { this._fsm.handle('disconnect', done); } /** * @private * @method module:azure-iot-device-mqtt.Mqtt#sendEvent * @description Sends an event to the server. * * @param {Message} message Message used for the content of the event sent to the server. */ /* Codes_SRS_NODE_DEVICE_MQTT_12_005: [The sendEvent method shall call the publish method on MqttTransport */ sendEvent(message: Message, done?: (err?: Error, result?: any) => void): void { debug('Invoking sendEvent handler in the device client FSM' + JSON.stringify(message)); this._fsm.handle('sendEvent', message, undefined, (err, puback) => { if (err) { debugErrors('send error: ' + err); done(err); } else { debug('PUBACK: ' + JSON.stringify(puback)); done(null, puback); } }); } /** * @private * @method module:azure-iot-device-mqtt.Mqtt#sendEventBatch * @description Not Implemented. * @param {Message[]} messages The [messages]{@linkcode module:common/message.Message} * to be sent. * @param {Function} done The callback to be invoked when `sendEventBatch` * completes execution. */ sendEventBatch(_messages: Message[], _done: (err?: Error, result?: results.MessageEnqueued) => void): void { /*Codes_SRS_NODE_DEVICE_MQTT_16_056: [The `sendEventBatch` method shall throw a `NotImplementedError`]*/ throw new errors.NotImplementedError('MQTT Transport does not support batching yet'); } /** * @private * @deprecated // Implementation test belongs in the client. * @method module:azure-iot-device-mqtt.Mqtt#complete * @description Settles the message as complete and calls the done callback with the result. * * @param {Message} message The message to settle as complete. * @param {Function} done The callback that shall be called with the error or result object. */ complete(_message: Message, done?: (err?: Error, result?: any) => void): void { /*Codes_SRS_NODE_DEVICE_MQTT_16_005: [The ‘complete’ method shall call the callback given as argument immediately since all messages are automatically completed.]*/ done(null, new results.MessageCompleted()); } /** * @private * @deprecated // Implementation test belongs in the client. * @method module:azure-iot-device-mqtt.Mqtt#reject * @description Settles the message as rejected and calls the done callback with the result. * * @throws {Error} The MQTT transport does not support rejecting messages. */ reject(): void { /*Codes_SRS_NODE_DEVICE_MQTT_16_006: [The ‘reject’ method shall throw because MQTT doesn’t support rejecting messages.] */ throw new errors.NotImplementedError('the MQTT transport does not support rejecting messages.'); } /** * @private * @deprecated // Implementation test belongs in the client. * @method module:azure-iot-device-mqtt.Mqtt#abandon * @description Settles the message as abandoned and calls the done callback with the result. * * @throws {Error} The MQTT transport does not support abandoning messages. */ abandon(): void { /*Codes_SRS_NODE_DEVICE_MQTT_16_004: [The ‘abandon’ method shall throw because MQTT doesn’t support abandoning messages.] */ throw new errors.NotImplementedError('The MQTT transport does not support abandoning messages.'); } /** * @private * @method module:azure-iot-device-mqtt.Mqtt#updateSharedAccessSignature * @description This methods sets the SAS token used to authenticate with the IoT Hub service. * * @param {String} sharedAccessSignature The new SAS token. * @param {Function} done The callback to be invoked when `updateSharedAccessSignature` completes. */ updateSharedAccessSignature(sharedAccessSignature: string, done: (err?: Error, result?: any) => void): void { debug('updateSharedAccessSignature'); /*Codes_SRS_NODE_DEVICE_MQTT_16_007: [The `updateSharedAccessSignature` method shall save the new shared access signature given as a parameter to its configuration.]*/ (this._authenticationProvider as SharedAccessSignatureAuthenticationProvider).updateSharedAccessSignature(sharedAccessSignature); this._fsm.handle('updateSharedAccessSignature', sharedAccessSignature, (err, result) => { done(err, result); }); } /** * @private * @method module:azure-iot-device-mqtt.Mqtt#setOptions * @description This methods sets the MQTT specific options of the transport. * * @param {object} options Options to set. Currently for MQTT these are the x509 cert, key, and optional passphrase properties. (All strings) * @param {Function} done The callback to be invoked when `setOptions` completes. */ setOptions(options: DeviceClientOptions, done?: (err?: Error, result?: any) => void): void { /*Codes_SRS_NODE_DEVICE_MQTT_16_011: [The `setOptions` method shall throw a `ReferenceError` if the `options` argument is falsy]*/ if (!options) throw new ReferenceError('The options parameter can not be \'' + options + '\''); /*Codes_SRS_NODE_DEVICE_MQTT_16_015: [The `setOptions` method shall throw an `ArgumentError` if the `cert` property is populated but the device uses symmetric key authentication.]*/ if (this._authenticationProvider.type === AuthenticationType.Token && options.cert) throw new errors.ArgumentError('Cannot set x509 options on a device that uses token authentication.'); /*Codes_SRS_NODE_DEVICE_MQTT_41_014: [For a Plug and Play Device the model id should be included as `&model-id=<DEVICE’s MODEL ID>` after the api-version ] */ if (options.modelId) { this._mid = '&model-id=' + options.modelId; } /*Codes_SRS_NODE_DEVICE_MQTT_41_001: [The MQTT transport should use the productInfo string in the `options` object if present]*/ if (options.productInfo) { // To enforce proper use of the productInfo option, if the setOption is called after HTTP calls have already been made (therefore _userAgentString already set) an error is thrown. if (this._userAgentString) { /*Codes_SRS_NODE_DEVICE_MQTT_41_003: [`productInfo` must be set before `mqtt._ensureAgentString` is invoked for the first time]*/ throw Error('Ensure you call setOption for productInfo before initiating any connection to IoT Hub'); } else { this._productInfo = options.productInfo; } } /* Codes_SRS_NODE_DEVICE_MQTT_06_001: [The `setOptions` method shall throw an `InvalidOperationError` if the method is called with token renewal options while using using cert or non renewal authentication.] */ if (options.tokenRenewal) { if (this._authenticationProvider.type === AuthenticationType.X509) { throw new errors.InvalidOperationError('cannot set token renewal options when using X509 authentication'); } else if (!this._authenticationProvider.setTokenRenewalValues) { throw new errors.InvalidOperationError('can only set token renewal options when using pre-shared key authentication'); } else { /* Codes_SRS_NODE_DEVICE_MQTT_06_002: [The authentication providers `setTokenRenewalValues` method shall be invoked with the values provided in the tokenRenewal option.] */ this._authenticationProvider.setTokenRenewalValues(options.tokenRenewal.tokenValidTimeInSeconds, options.tokenRenewal.tokenRenewalMarginInSeconds); } } this._mqtt.setOptions(options); if (!options.cert) { if (done) done(null); } else { /*Codes_SRS_NODE_DEVICE_MQTT_16_069: [The `setOptions` method shall obtain the current credentials by calling `getDeviceCredentials` on the `AuthenticationProvider` passed to the constructor as an argument.]*/ this._authenticationProvider.getDeviceCredentials((err, _credentials) => { if (err) { /*Codes_SRS_NODE_DEVICE_MQTT_16_070: [The `setOptions` method shall call its callback with the error returned by `getDeviceCredentials` if it fails to return the credentials.]*/ if (done) done(err); } else { /*Codes_SRS_NODE_DEVICE_MQTT_16_012: [The `setOptions` method shall update the existing configuration of the MQTT transport with the content of the `options` object.]*/ (this._authenticationProvider as X509AuthenticationProvider).setX509Options(options); /*Codes_SRS_NODE_DEVICE_MQTT_16_013: [If a `done` callback function is passed as a argument, the `setOptions` method shall call it when finished with no arguments.]*/ /*Codes_SRS_NODE_DEVICE_MQTT_16_014: [The `setOptions` method shall not throw if the `done` argument is not passed.]*/ if (done) done(null); } }); } } /** * @private * @method module:azure-iot-device-mqtt.Mqtt.Mqtt#sendMethodResponse * @description Sends the response for a device method call to the service. * * @param {Object} response This is the `response` object that was * produced by the device client object when a * C2D device method call was received. * @param {Function} done The callback to be invoked when the response * has been sent to the service. * * @throws {Error} If the `response` parameter is falsy or does * not have the expected shape. */ sendMethodResponse(response: DeviceMethodResponse, done?: (err?: Error, result?: any) => void): void { // Codes_SRS_NODE_DEVICE_MQTT_13_001: [ sendMethodResponse shall throw an Error if response is falsy or does not conform to the shape defined by DeviceMethodResponse. ] if (!response) { throw new Error('Parameter \'response\' is falsy'); } if (!response.requestId) { throw new Error('Parameter \'response.requestId\' is falsy'); } if (typeof(response.requestId) !== 'string') { throw new Error('Parameter \'response.requestId\' is not a string.'); } if (!response.status) { throw new Error('Parameter \'response.status\' is falsy'); } if (typeof(response.status) !== 'number') { throw new Error('Parameter \'response.status\' is not a number'); } this._fsm.handle('sendMethodResponse', response, done); } /** * @private */ onDeviceMethod(methodName: string, callback: (methodRequest: MethodMessage, methodResponse: DeviceMethodResponse) => void): void { /*Codes_SRS_NODE_DEVICE_MQTT_16_066: [The `methodCallback` parameter shall be called whenever a `method_<methodName>` is emitted and device methods have been enabled.]*/ this.on('method_' + methodName, callback); } /** * @private */ enableC2D(callback: (err?: Error) => void): void { this._fsm.handle('enableC2D', callback); } /** * @private */ disableC2D(callback: (err?: Error) => void): void { this._fsm.handle('disableC2D', callback); } /** * @private */ enableInputMessages(callback: (err?: Error) => void): void { this._fsm.handle('enableInputMessages', callback); } /** * @private */ disableInputMessages(callback: (err?: Error) => void): void { this._fsm.handle('disableInputMessages', callback); } /** * @private */ enableMethods(callback: (err?: Error) => void): void { this._fsm.handle('enableMethods', callback); } /** * @private */ disableMethods(callback: (err?: Error) => void): void { this._fsm.handle('disableMethods', callback); } /** * @private */ getTwin(callback: (err?: Error, twin?: TwinProperties) => void): void { this._fsm.handle('getTwin', callback); } /** * @private */ updateTwinReportedProperties(patch: any, callback: (err?: Error) => void): void { this._fsm.handle('updateTwinReportedProperties', patch, callback); } /** * @private */ enableTwinDesiredPropertiesUpdates(callback: (err?: Error) => void): void { this._fsm.handle('enableTwinDesiredPropertiesUpdates', callback); } /** * @private */ disableTwinDesiredPropertiesUpdates(callback: (err?: Error) => void): void { this._fsm.handle('disableTwinDesiredPropertiesUpdates', callback); } /** * @private */ sendOutputEvent(outputName: string, message: Message, done: (err?: Error, result?: results.MessageEnqueued) => void): void { debug('sendOutputEvent ' + JSON.stringify(message)); /*Codes_SRS_NODE_DEVICE_MQTT_18_035: [ The `sendOutputEvent` method shall call the publish method on `MqttBase`. ]*/ /*Codes_SRS_NODE_DEVICE_MQTT_18_039: [ The `sendOutputEvent` method shall use QoS level of 1. ]*/ /*Codes_SRS_NODE_DEVICE_MQTT_18_068: [ The `sendOutputEvent` method shall serialize the `outputName` property of the message as a key-value pair on the topic with the key `$.on`. ] */ this._fsm.handle('sendEvent', message, { '$.on': outputName }, (err, puback) => { if (err) { debugErrors('send error: ' + err); done(err); } else { debug('PUBACK: ' + JSON.stringify(puback)); done(null, puback); } }); } /** * @private */ sendOutputEventBatch(_outputName: string, _messages: Message[], _done: (err?: Error, result?: results.MessageEnqueued) => void): void { /*Codes_SRS_NODE_DEVICE_MQTT_18_051: [ `sendOutputEventBatch` shall throw a `NotImplementedError` exception. ]*/ throw new errors.NotImplementedError('MQTT Transport does not support batching yet'); } protected _getBaseTransportConfig(credentials: TransportConfig): MqttBaseTransportConfig { let clientId: string; /*Codes_SRS_NODE_DEVICE_MQTT_18_052: [ If a `moduleId` is specified in the connection string, the Mqtt constructor shall initialize the `clientId` property of the `config` object to '<deviceId>/<moduleId>'. ]*/ /*Codes_SRS_NODE_DEVICE_MQTT_18_053: [ If a `moduleId` is not specified in the connection string, the Mqtt constructor shall initialize the `clientId` property of the `config` object to '<deviceId>'. ]*/ if (credentials.moduleId) { clientId = credentials.deviceId + '/' + credentials.moduleId; } else { clientId = credentials.deviceId; } /*Codes_SRS_NODE_DEVICE_MQTT_41_015: [If a modelId is provided, the device should use the PnP API String] */ const apiVersionString = endpoint.versionQueryString(); /*Codes_SRS_NODE_DEVICE_MQTT_16_016: [If the connection string does not specify a `gatewayHostName` value, the Mqtt constructor shall initialize the `uri` property of the `config` object to `mqtts://<host>`.]*/ /*Codes_SRS_NODE_DEVICE_MQTT_18_054: [If a `gatewayHostName` is specified in the connection string, the Mqtt constructor shall initialize the `uri` property of the `config` object to `mqtts://<gatewayhostname>`. ]*/ /*Codes_SRS_NODE_DEVICE_MQTT_18_055: [The Mqtt constructor shall initialize the `username` property of the `config` object to '<host>/<clientId>/api-version=<version>&DeviceClientType=<agentString>'. ]*/ /*Codes_SRS_NODE_DEVICE_MQTT_41_002: [The MQTT constructor shall append the productInfo to the `username` property of the `config` object.]*/ /*Codes_SRS_NODE_DEVICE_MQTT_41_014: [For a Plug and Play Device the modelId should be included as `&modelId=<DEVICE’s MODEL ID>` after the api-version]*/ const baseConfig: MqttBaseTransportConfig = { uri: 'mqtts://' + (credentials.gatewayHostName || credentials.host), username: credentials.host + '/' + clientId + '/' + apiVersionString + this._mid + '&DeviceClientType=' + encodeURIComponent(this._userAgentString), clientId: clientId, sharedAccessSignature: credentials.sharedAccessSignature, x509: credentials.x509 }; return baseConfig; } protected _configureEndpoints(credentials: TransportConfig): void { if (credentials.moduleId) { this._topicTelemetryPublish = endpoint.moduleEventPath(credentials.deviceId, credentials.moduleId).substring(1) + '/'; } else { this._topicTelemetryPublish = endpoint.deviceEventPath(credentials.deviceId).substring(1) + '/'; } debug('topic publish: ' + this._topicTelemetryPublish); // MQTT topics to subscribe to this._topics = {}; this._topics.method = { name: '$iothub/methods/POST/#', subscribeInProgress: false, subscribed: false, topicMatchRegex: /^\$iothub\/methods\/POST\/.*$/g, handler: this._onDeviceMethod.bind(this) }; if (credentials.moduleId) { this._topics.inputMessage = { name: endpoint.moduleInputMessagePath(credentials.deviceId, credentials.moduleId).substring(1) + '/#', subscribeInProgress: false, subscribed: false, topicMatchRegex: /^devices\/.*\/modules\/.*\/inputs\/.*\/.*$/g, handler: this._onInputMessage.bind(this) }; debug('inputMessage topic subscribe: ' + this._topics.inputMessage.name); } else { this._topics.message = { name: endpoint.deviceMessagePath(credentials.deviceId).substring(1) + '/#', subscribeInProgress: false, subscribed: false, topicMatchRegex: /^devices\/.*\/messages\/devicebound\/.*$/g, handler: this._onC2DMessage.bind(this) }; debug('message topic subscribe: ' + this._topics.message.name); } } private _setupSubscription(topic: TopicDescription, qos: 0 | 1, callback: (err?: Error) => void): void { debug('subscribe: ' + JSON.stringify(topic)); topic.subscribeInProgress = true; /*Codes_SRS_NODE_DEVICE_MQTT_16_049: [`enableC2D` shall subscribe to the MQTT topic for messages with a QoS of `1`.]*/ /*Codes_SRS_NODE_DEVICE_MQTT_16_040: [`enableMethods` shall subscribe to the MQTT topic for direct methods.]*/ /*Codes_SRS_NODE_DEVICE_MQTT_18_061: [`enableInputMessages` shall subscribe to the MQTT topic for inputMessages. ]*/ this._mqtt.subscribe(topic.name, { qos: qos }, (err) => { topic.subscribeInProgress = false; topic.subscribed = true; /*Codes_SRS_NODE_DEVICE_MQTT_16_050: [`enableC2D` shall call its callback with no arguments when the `SUBACK` packet is received.]*/ /*Codes_SRS_NODE_DEVICE_MQTT_16_051: [`enableMethods` shall call its callback with no arguments when the `SUBACK` packet is received.]*/ /*Codes_SRS_NODE_DEVICE_MQTT_18_062: [`enableInputMessages` shall call its callback with no arguments when the `SUBACK` packet is received. ]*/ /*Codes_SRS_NODE_DEVICE_MQTT_16_052: [`enableC2D` shall call its callback with an `Error` if subscribing to the topic fails.]*/ /*Codes_SRS_NODE_DEVICE_MQTT_16_053: [`enableMethods` shall call its callback with an `Error` if subscribing to the topic fails.]*/ /*Codes_SRS_NODE_DEVICE_MQTT_18_063: [`enableInputMessages` shall call its callback with an `Error` if subscribing to the topic fails. ]*/ this._ignoreConnectionClosedInErrorCallback(callback)(err); }); } private _removeSubscription(topic: TopicDescription, callback: (err?: Error) => void): void { debug('unsubscribe ' + JSON.stringify(topic)); /*Codes_SRS_NODE_DEVICE_MQTT_16_042: [`disableC2D` shall unsubscribe from the topic for C2D messages.]*/ /*Codes_SRS_NODE_DEVICE_MQTT_16_045: [`disableMethods` shall unsubscribe from the topic for direct methods.]*/ /*Codes_SRS_NODE_DEVICE_MQTT_18_065: [`disableInputMessages` shall unsubscribe from the topic for inputMessages. ]*/ this._mqtt.unsubscribe(topic.name, (err) => { topic.subscribed = !!err; // this sets the topic.subscribed to false if the unsubscribe is successful. /*Codes_SRS_NODE_DEVICE_MQTT_16_054: [`disableC2D` shall call its callback with no arguments when the `UNSUBACK` packet is received.]*/ /*Codes_SRS_NODE_DEVICE_MQTT_16_055: [`disableMethods` shall call its callback with no arguments when the `UNSUBACK` packet is received.]*/ /*Codes_SRS_NODE_DEVICE_MQTT_18_066: [`disableInputMessages` shall call its callback with no arguments when the `UNSUBACK` packet is received. ]*/ /*Codes_SRS_NODE_DEVICE_MQTT_16_043: [`disableC2D` shall call its callback with an `Error` if an error is received while unsubscribing.]*/ /*Codes_SRS_NODE_DEVICE_MQTT_16_046: [`disableMethods` shall call its callback with an `Error` if an error is received while unsubscribing.]*/ /*Codes_SRS_NODE_DEVICE_MQTT_18_067: [ `disableInputMessages` shall call its callback with an `Error` if an error is received while unsubscribing. ]*/ this._ignoreConnectionClosedInErrorCallback(callback)(err); }); } private _dispatchMqttMessage(topic: string, payload: any): void { debug('message received on ' + topic); debug(JSON.stringify(payload ? payload.toString() : null)); // dispatch the message to either the c2d message handler or the device method handler. // finding out which topic we should dispatch the call to is done by running the regex for each topic in the this._topics dictionary // after searching for the topic with regexes, targetTopic will contain the entry of the this._topics dictionary that corresponds to the topic passed as argument. let targetTopic = null; Object.keys(this._topics).some((topicIndex) => { // Turns out regexes are stateful. We need to reset the search index back to // the beginning every time we use it. const theTopic = this._topics[topicIndex]; theTopic.topicMatchRegex.lastIndex = 0; if (theTopic.topicMatchRegex.test(topic)) { targetTopic = theTopic; } return targetTopic !== null; }); // we have now run through all regexes in the this._topics table but we're still not sure we found something if (targetTopic) { // if the targetTopic is truthy then it means one of the regex matched, therefore we can call its corresponding handler. targetTopic.handler(topic, payload); } } private _onC2DMessage(topic: string, payload: any): void { /*Codes_SRS_NODE_DEVICE_MQTT_RECEIVER_16_005: [When a message event is emitted, the parameter shall be of type Message]*/ const msg = new Message(payload); const topicParts = topic.split('/'); // Message properties are always the 5th segment of the topic if (topicParts[4]) { this._extractPropertiesFromTopicPart(topicParts[4], msg); } /*Codes_SRS_NODE_DEVICE_MQTT_RECEIVER_16_004: [If there is a listener for the message event, a message event shall be emitted for each message received.]*/ this.emit('message', msg); } private _extractPropertiesFromTopicPart(properties: string, msg: Message): void { const keyValuePairs = properties.split('&'); for (let i = 0; i < keyValuePairs.length; i++) { const keyValuePair = keyValuePairs[i].split('='); const k = decodeURIComponent(keyValuePair[0]); const v = decodeURIComponent(keyValuePair[1]); switch (k) { case '$.mid': /*Codes_SRS_NODE_DEVICE_MQTT_RECEIVER_16_008: [When a message is received, the receiver shall populate the generated `Message` object `messageId` with the value of the property `$.mid` serialized in the topic, if present.]*/ msg.messageId = v; break; case '$.to': /*Codes_SRS_NODE_DEVICE_MQTT_RECEIVER_16_009: [When a message is received, the receiver shall populate the generated `Message` object `to` with the value of the property `$.to` serialized in the topic, if present.]*/ msg.to = v; break; case '$.exp': /*Codes_SRS_NODE_DEVICE_MQTT_RECEIVER_16_010: [When a message is received, the receiver shall populate the generated `Message` object `expiryTimeUtc` with the value of the property `$.exp` serialized in the topic, if present.]*/ msg.expiryTimeUtc = v; break; case '$.cid': /*Codes_SRS_NODE_DEVICE_MQTT_RECEIVER_16_011: [When a message is received, the receiver shall populate the generated `Message` object `correlationId` with the value of the property `$.cid` serialized in the topic, if present.]*/ msg.correlationId = v; break; case '$.uid': /*Codes_SRS_NODE_DEVICE_MQTT_RECEIVER_16_012: [When a message is received, the receiver shall populate the generated `Message` object `userId` with the value of the property `$.uid` serialized in the topic, if present.]*/ msg.userId = v; break; case '$.ct': /*Codes_SRS_NODE_DEVICE_MQTT_RECEIVER_16_013: [When a message is received, the receiver shall populate the generated `Message` object `contentType` with the value of the property `$.ct` serialized in the topic, if present.]*/ msg.contentType = <any>v; break; case '$.ce': /*Codes_SRS_NODE_DEVICE_MQTT_RECEIVER_16_014: [When a message is received, the receiver shall populate the generated `Message` object `contentEncoding` with the value of the property `$.ce` serialized in the topic, if present.]*/ msg.contentEncoding = <any>v; break; default: /*Codes_SRS_NODE_DEVICE_MQTT_RECEIVER_16_007: [When a message is received, the receiver shall populate the generated `Message` object `properties` property with the user properties serialized in the topic.]*/ msg.properties.add(k, v); break; } } } private _onInputMessage(topic: string, payload: any): void { /*Codes_SRS_NODE_DEVICE_MQTT_18_056: [ When an `inputMessage` event is emitted, the first parameter shall be the inputName and the second parameter shall be of type `Message`. ]*/ const msg = new Message(payload); /*Codes_SRS_NODE_DEVICE_MQTT_18_058: [ When an `inputMessage` event is received, Mqtt shall extract the inputName from the topic according to the following convention: 'devices/<deviceId>/modules/<moduleId>/inputs/<inputName>' ]*/ const topicParts = topic.split('/'); if (topicParts[6]) { this._extractPropertiesFromTopicPart(topicParts[6], msg); } const inputName: string = topicParts[5]; /*Codes_SRS_NODE_DEVICE_MQTT_18_057: [ An `inputMessage` event shall be emitted for each message received. ]*/ this.emit('inputMessage', inputName, msg); } private _onDeviceMethod(topic: string, payload: any): void { // The topic name looks like this: // $iothub/methods/POST/{method name}?$rid={request id}&{serialized properties} // We parse out the message. /* Codes_SRS_NODE_DEVICE_MQTT_RECEIVER_13_005: [ When a method_<METHOD NAME> event is emitted the parameter shall conform to the shape as defined by the interface specified below: interface StringMap { [key: string]: string; } interface MethodMessage { methods: { methodName: string; }; requestId: string; properties: StringMap; body: Buffer; verb: string; } ]*/ const methodMessage = _parseMessage(topic, payload); if (methodMessage) { // Codes_SRS_NODE_DEVICE_MQTT_RECEIVER_13_003: [ If there is a listener for the method event, a method_<METHOD NAME> event shall be emitted for each message received. ] // we emit a message for the event 'method_{method name}' this.emit('method_' + methodMessage.methods.methodName, methodMessage); } } private _getEventTopicFromMessage(message: Message, extraSystemProperties?: { [key: string]: string }): string { const DT_SUBJECT = 'dt-subject'; /*Codes_SRS_NODE_COMMON_MQTT_BASE_16_008: [The `sendEvent` method shall use a topic formatted using the following convention: `devices/<deviceId>/messages/events/`.]*/ /*Codes_SRS_NODE_DEVICE_MQTT_18_036: [ If a `moduleId` was not specified in the transport connection, the `sendOutputEvent` method shall use a topic formatted using the following convention: `devices/<deviceId>/messages/events/`. ]*/ /*Codes_SRS_NODE_DEVICE_MQTT_18_037: [ If a `moduleId` was specified in the transport connection, the `sendOutputEvent` method shall use a topic formatted using the following convention: `devices/<deviceId>/<moduleId>/messages/events/`. ]*/ /*Codes_SRS_NODE_DEVICE_MQTT_18_034: [ If the connection string specifies a `moduleId` value, the `sendEvent` method shall use a topic formatted using the following convention: `devices/<deviceId>/<moduleId>/messages/events/` ]*/ let topic = this._topicTelemetryPublish; const systemProperties: { [key: string]: string } = extraSystemProperties || {}; /*Codes_SRS_NODE_COMMON_MQTT_BASE_16_011: [The `sendEvent` method shall serialize the `messageId` property of the message as a key-value pair on the topic with the key `$.mid`.]*/ /*Codes_SRS_NODE_DEVICE_MQTT_18_040: [ The `sendOutputEvent` method shall serialize the `messageId` property of the message as a key-value pair on the topic with the key `$.mid`. ]*/ if (message.messageId) systemProperties['$.mid'] = message.messageId; /*Codes_SRS_NODE_COMMON_MQTT_BASE_16_012: [The `sendEvent` method shall serialize the `correlationId` property of the message as a key-value pair on the topic with the key `$.cid`.]*/ /*Codes_SRS_NODE_DEVICE_MQTT_18_041: [ The `sendOutputEvent` method shall serialize the `correlationId` property of the message as a key-value pair on the topic with the key `$.cid`. ]*/ if (message.correlationId) systemProperties['$.cid'] = message.correlationId; /*Codes_SRS_NODE_COMMON_MQTT_BASE_16_013: [The `sendEvent` method shall serialize the `userId` property of the message as a key-value pair on the topic with the key `$.uid`.]*/ /*Codes_SRS_NODE_DEVICE_MQTT_18_042: [ The `sendOutputEvent` method shall serialize the `userId` property of the message as a key-value pair on the topic with the key `$.uid`. ]*/ if (message.userId) systemProperties['$.uid'] = message.userId; /*Codes_SRS_NODE_COMMON_MQTT_BASE_16_014: [The `sendEvent` method shall serialize the `to` property of the message as a key-value pair on the topic with the key `$.to`.]*/ /*Codes_SRS_NODE_DEVICE_MQTT_18_043: [ The `sendOutputEvent` method shall serialize the `to` property of the message as a key-value pair on the topic with the key `$.to`. ]*/ if (message.to) systemProperties['$.to'] = message.to; /*Codes_SRS_NODE_COMMON_MQTT_BASE_16_015: [The `sendEvent` method shall serialize the `expiryTimeUtc` property of the message as a key-value pair on the topic with the key `$.exp`.]*/ /*Codes_SRS_NODE_DEVICE_MQTT_18_044: [ The `sendOutputEvent` method shall serialize the `expiryTimeUtc` property of the message as a key-value pair on the topic with the key `$.exp`. ]*/ /*Codes_SRS_NODE_DEVICE_MQTT_16_084: [The `sendEvent` method shall serialize the `contentType` property of the message as a key-value pair on the topic with the key `$.ct`.]*/ if (message.contentType) systemProperties['$.ct'] = <string>message.contentType; /*Codes_SRS_NODE_DEVICE_MQTT_16_083: [The `sendEvent` method shall serialize the `contentEncoding` property of the message as a key-value pair on the topic with the key `$.ce`.]*/ if (message.contentEncoding) systemProperties['$.ce'] = <string>message.contentEncoding; if (message.interfaceId) systemProperties['$.ifid'] = message.interfaceId; if (message.expiryTimeUtc) { const expiryString = message.expiryTimeUtc instanceof Date ? message.expiryTimeUtc.toISOString() : message.expiryTimeUtc; systemProperties['$.exp'] = (expiryString || undefined); } const sysPropString = querystring.stringify(systemProperties); topic += sysPropString; /*Codes_SRS_NODE_COMMON_MQTT_BASE_16_009: [If the message has properties, the property keys and values shall be uri-encoded, then serialized and appended at the end of the topic with the following convention: `<key>=<value>&<key2>=<value2>&<key3>=<value3>(...)`.]*/ /*Codes_SRS_NODE_DEVICE_MQTT_18_038: [ If the outputEvent message has properties, the property keys and values shall be uri-encoded, then serialized and appended at the end of the topic with the following convention: `<key>=<value>&<key2>=<value2>&<key3>=<value3>(...)`. ]*/ if (message.properties && message.properties.count() > 0) { for (let i = 0; i < message.properties.count(); i++) { if (i > 0 || sysPropString) topic += '&'; const keyName = message.properties.propertyList[i].key; topic += encodeURIComponent(((keyName === DT_SUBJECT) ? ('$.sub') : (keyName))) + '=' + encodeURIComponent(message.properties.propertyList[i].value); } } return topic; } private _ensureAgentString(done: () => void): void { if (this._userAgentString) { done(); } else { const customInfo = (this._productInfo) ? this._productInfo : ''; getUserAgentString(customInfo, (agent) => { this._userAgentString = agent; done(); }); } } // // We encountered an issue where a closed error was raised which some application handlers would respond to by // calling closed. This would then deadlock behind the code currently executing in mqttjs's close code. // The best solution that could happen exclusively inside the SDK code would be to drop the close error because // we KNOW that a disconnect would be raised after mqttjs finishes the close 'rundown'. // private _ignoreConnectionClosedInErrorCallback(callback: (err?: Error, ...args: any[]) => void): (err?: Error, ...args: any[]) => void { return (err: Error, ...args: any[]) => { if (err?.name === 'Error' && err?.message === 'Connection closed') { debug('Mqtt subscribe/unsubscribe operation failed due to MQTT.js connection closed error. MqttBase will handle this when MQTT.js emits the close event.'); return; } callback(err, ...args); }; } } /** * @private */ interface TopicDescription { name: string; subscribeInProgress: boolean; subscribed: boolean; topicMatchRegex: RegExp; handler: (topic: string, payload: any) => void; } /** * @private */ class MethodDescription { methodName: string; verb: string; } /** * @private */ class MethodMessageImpl implements MethodMessage { methods: MethodDescription; requestId: string; properties: { [key: string]: string }; body: Buffer; } function _parseMessage(topic: string, body: any): MethodMessage { let url: URL.Url; let path: string[]; let query: any; try { url = URL.parse(topic); path = url.path.split('/'); query = querystring.parse(url.query as string); } catch (err) { debugErrors('Could not parse topic for received message: ' + topic); debugErrors('Error is ' + err); return undefined; } // if the topic has a querystring then 'path' will include it; so // we strip it out const lastPathComponent = path[path.length - 1]; if (lastPathComponent.indexOf('?') !== -1) { path[path.length - 1] = lastPathComponent.substr( 0, lastPathComponent.indexOf('?') ); } if (path.length > 0 && path[0] === '$iothub') { const message = new MethodMessageImpl(); if (path.length > 1 && path[1].length > 0) { // create an object for the module; for example, $iothub/twin/... // would result in there being a message.twin object const mod = message[path[1]] = new MethodDescription(); // populates the request ID if there is one if (query.$rid) { message.requestId = query.$rid; } // parse the other properties properties (excluding $rid) message.properties = query; delete message.properties.$rid; // save the body message.body = body; // parse the verb if (path.length > 2 && path[2].length > 0) { mod.verb = path[2]; // This is a topic that looks like this: // $iothub/methods/POST/{method name}?$rid={request id}&{serialized properties} // We parse the method name out. if (path.length > 3 && path[3].length > 0) { mod.methodName = path[3]; } else { // The service published a message on a strange topic name. This is // probably a service bug. At any rate we don't know what to do with // this strange topic so we throw. throw new Error('Device method call\'s MQTT topic name does not include the method name.'); } } } return message; } return undefined; }