in common/transport/mqtt/src/mqtt_base.ts [393:470]
private _connectClient(callback: (err?: Error, connack?: any) => void): void {
/*Codes_SRS_NODE_COMMON_MQTT_BASE_16_002: [The `connect` method shall use the authentication parameters contained in the `config` argument to connect to the server.]*/
const options: IClientOptions = {
protocolId: 'MQTT',
protocolVersion: 4,
clean: this._config.clean || false,
clientId: this._config.clientId,
username: this._config.username,
reconnectPeriod: 0, // Client will handle reconnection at the higher level.
connectTimeout: 60 * 1000,
/*Codes_SRS_NODE_COMMON_MQTT_BASE_16_016: [The `connect` method shall configure the `keepalive` ping interval to 3 minutes by default since the Azure Load Balancer TCP Idle timeout default is 4 minutes.]*/
keepalive: 180,
reschedulePings: false
};
/*Codes_SRS_NODE_COMMON_MQTT_BASE_18_001: [The `connect` method shall set the `ca` option based on the `ca` string passed in the `options` structure via the `setOptions` function.]*/
if (this._options) {
if (this._options.ca) {
options.ca = this._options.ca;
}
/*Codes_SRS_NODE_COMMON_MQTT_BASE_41_001: [The `connect` method shall set the `keepalive` option based on the `keepalive` numeric value passed in the `options` structure via the `setOptions` function.]*/
if (this._options.keepalive) {
options.keepalive = this._options.keepalive;
}
/*Codes_SRS_NODE_COMMON_MQTT_BASE_18_002: [The `connect` method shall set the `wsOptions.agent` option based on the `mqtt.webSocketAgent` object passed in the `options` structure via the `setOptions` function.]*/
if (this._options.mqtt && this._options.mqtt.webSocketAgent) {
options.wsOptions = {
agent: this._options.mqtt.webSocketAgent
};
}
}
if (this._config.sharedAccessSignature) {
options.password = this._config.sharedAccessSignature.toString();
debug('username: ' + options.username);
debug('uri: ' + this._config.uri);
} else {
options.cert = this._config.x509.cert;
options.key = this._config.x509.key;
(<any>options).passphrase = this._config.x509.passphrase; // forced to cast to any because passphrase is used by tls options but not surfaced by the types definition.
}
const createErrorCallback = (eventName) => {
return (error) => {
debugErrors('received \'' + eventName + '\' from mqtt client');
debugErrors(' error supplied is: ' + this._errorDescription(error));
const err = error || new errors.NotConnectedError('Unable to establish a connection');
callback(err);
};
};
/*Codes_SRS_NODE_COMMON_MQTT_BASE_16_003: [The `connect` method shall call the `done` callback with a standard javascript `Error` object if the connection failed.]*/
const errorCallback = createErrorCallback('error');
const closeCallback = createErrorCallback('close');
const offlineCallback = createErrorCallback('offline');
const disconnectCallback = createErrorCallback('disconnect');
this._mqttClient = this.mqttProvider.connect(this._config.uri, options);
this._mqttTrackedListeners = new ExternalEventTracker(this._mqttClient);
this._mqttTrackedListeners.addTrackedListener('message', this._messageCallback.bind(this));
this._mqttTrackedListeners.addTrackedListener('error', errorCallback);
this._mqttTrackedListeners.addTrackedListener('close', closeCallback);
this._mqttTrackedListeners.addTrackedListener('offline', offlineCallback);
this._mqttTrackedListeners.addTrackedListener('disconnect', disconnectCallback);
this._mqttTrackedListeners.addTrackedListener('connect', (connack) => {
debug('Device is connected');
debug('CONNACK: ' + JSON.stringify(connack));
this._mqttTrackedListeners.removeTrackedListener('error', errorCallback);
this._mqttTrackedListeners.removeTrackedListener('close', closeCallback);
this._mqttTrackedListeners.removeTrackedListener('offline', offlineCallback);
this._mqttTrackedListeners.removeTrackedListener('disconnect', disconnectCallback);
callback(null, connack);
});
}