private _connectClient()

in common/transport/mqtt/src/mqtt_base.ts [393:470]


  private _connectClient(callback: (err?: Error, connack?: any) => void): void {
    /*Codes_SRS_NODE_COMMON_MQTT_BASE_16_002: [The `connect` method shall use the authentication parameters contained in the `config` argument to connect to the server.]*/
    const options: IClientOptions = {
      protocolId: 'MQTT',
      protocolVersion: 4,
      clean: this._config.clean || false,
      clientId: this._config.clientId,
      username: this._config.username,
      reconnectPeriod: 0,  // Client will handle reconnection at the higher level.
      connectTimeout: 60 * 1000,
      /*Codes_SRS_NODE_COMMON_MQTT_BASE_16_016: [The `connect` method shall configure the `keepalive` ping interval to 3 minutes by default since the Azure Load Balancer TCP Idle timeout default is 4 minutes.]*/
      keepalive: 180,
      reschedulePings: false
    };

    /*Codes_SRS_NODE_COMMON_MQTT_BASE_18_001: [The `connect` method shall set the `ca` option based on the `ca` string passed in the `options` structure via the `setOptions` function.]*/
    if (this._options) {
      if (this._options.ca) {
        options.ca = this._options.ca;
      }

      /*Codes_SRS_NODE_COMMON_MQTT_BASE_41_001: [The `connect` method shall set the `keepalive` option based on the `keepalive` numeric value passed in the `options` structure via the `setOptions` function.]*/
      if (this._options.keepalive) {
        options.keepalive = this._options.keepalive;
      }

      /*Codes_SRS_NODE_COMMON_MQTT_BASE_18_002: [The `connect` method shall set the `wsOptions.agent` option based on the `mqtt.webSocketAgent` object passed in the `options` structure via the `setOptions` function.]*/
      if (this._options.mqtt && this._options.mqtt.webSocketAgent) {
        options.wsOptions = {
          agent: this._options.mqtt.webSocketAgent
        };
      }
    }
    if (this._config.sharedAccessSignature) {
      options.password = this._config.sharedAccessSignature.toString();
      debug('username: ' + options.username);
      debug('uri:      ' + this._config.uri);
    } else {
      options.cert = this._config.x509.cert;
      options.key = this._config.x509.key;
      (<any>options).passphrase = this._config.x509.passphrase; // forced to cast to any because passphrase is used by tls options but not surfaced by the types definition.
    }

    const createErrorCallback = (eventName) => {
      return (error) => {
        debugErrors('received \'' + eventName + '\' from mqtt client');
        debugErrors(' error supplied is: ' + this._errorDescription(error));
        const err = error || new errors.NotConnectedError('Unable to establish a connection');
        callback(err);
      };
    };
    /*Codes_SRS_NODE_COMMON_MQTT_BASE_16_003: [The `connect` method shall call the `done` callback with a standard javascript `Error` object if the connection failed.]*/
    const errorCallback = createErrorCallback('error');
    const closeCallback = createErrorCallback('close');
    const offlineCallback = createErrorCallback('offline');
    const disconnectCallback = createErrorCallback('disconnect');

    this._mqttClient = this.mqttProvider.connect(this._config.uri, options);
    this._mqttTrackedListeners = new ExternalEventTracker(this._mqttClient);

    this._mqttTrackedListeners.addTrackedListener('message', this._messageCallback.bind(this));
    this._mqttTrackedListeners.addTrackedListener('error', errorCallback);
    this._mqttTrackedListeners.addTrackedListener('close', closeCallback);
    this._mqttTrackedListeners.addTrackedListener('offline', offlineCallback);
    this._mqttTrackedListeners.addTrackedListener('disconnect', disconnectCallback);

    this._mqttTrackedListeners.addTrackedListener('connect', (connack) => {
      debug('Device is connected');
      debug('CONNACK: ' + JSON.stringify(connack));

      this._mqttTrackedListeners.removeTrackedListener('error', errorCallback);
      this._mqttTrackedListeners.removeTrackedListener('close', closeCallback);
      this._mqttTrackedListeners.removeTrackedListener('offline', offlineCallback);
      this._mqttTrackedListeners.removeTrackedListener('disconnect', disconnectCallback);

      callback(null, connack);
    });
  }