int SharedCrtResourceManager::establishConnection()

in source/SharedCrtResourceManager.cpp [225:350]


int SharedCrtResourceManager::establishConnection(const PlainConfig &config)
{
    if (!locateCredentials(config))
    {
        LOGM_ERROR(
            TAG,
            "*** %s: Failed to find file(s) with correct permissions required for establishing the MQTT connection ***",
            DeviceClient::DC_FATAL_ERROR);
        return SharedCrtResourceManager::ABORT;
    }
    auto clientConfigBuilder = MqttClientConnectionConfigBuilder(config.cert->c_str(), config.key->c_str());
    clientConfigBuilder.WithEndpoint(config.endpoint->c_str());
    clientConfigBuilder.WithCertificateAuthority(config.rootCa->c_str());
    clientConfigBuilder.WithSdkName(SharedCrtResourceManager::BINARY_NAME);
    clientConfigBuilder.WithSdkVersion(DEVICE_CLIENT_VERSION);

    auto clientConfig = clientConfigBuilder.Build();

    if (!clientConfig)
    {
        LOGM_ERROR(
            TAG,
            "MQTT Client Configuration initialization failed with error: %s",
            ErrorDebugString(clientConfig.LastError()));
        return ABORT;
    }

    connection = mqttClient->NewConnection(clientConfig);

    if (!*connection)
    {
        LOGM_ERROR(TAG, "MQTT Connection Creation failed with error: %s", ErrorDebugString(connection->LastError()));
        return ABORT;
    }

    promise<int> connectionCompletedPromise;

    /*
     * This will execute when an mqtt connect has completed or failed.
     */
    auto onConnectionCompleted = [&](Mqtt::MqttConnection &, int errorCode, Mqtt::ReturnCode returnCode, bool) {
        if (errorCode)
        {
            LOGM_ERROR(TAG, "MQTT Connection failed with error: %s", ErrorDebugString(errorCode));
            if (AWS_ERROR_MQTT_UNEXPECTED_HANGUP == errorCode)
            {
                LOG_ERROR(
                    TAG,
                    "*** Did you make sure you are using valid certificate with recommended policy attached to it? "
                    "Please refer README->Fleet Provisioning Feature section for more details on recommended policies "
                    "for AWS IoT Device Client. ***");
            }
            connectionCompletedPromise.set_value(errorCode);
        }
        else
        {
            LOGM_INFO(TAG, "MQTT connection established with return code: %d", returnCode);
            connectionCompletedPromise.set_value(0);
        }
    };

    /*
     * Invoked when a disconnect message has completed.
     */
    auto onDisconnect = [&](Mqtt::MqttConnection & /*conn*/) {
        {
            LOG_INFO(TAG, "MQTT Connection is now disconnected");
            connectionClosedPromise.set_value();
        }
    };

    /*
     * Invoked when connection is interrupted.
     */
    auto OnConnectionInterrupted = [&](Mqtt::MqttConnection &, int errorCode) {
        {
            if (errorCode)
            {
                LOGM_ERROR(
                    TAG,
                    "MQTT Connection interrupted with error: `%s`. Device Client will retry connection until it is "
                    "successfully connected to the core. ",
                    ErrorDebugString(errorCode));
            }
        }
    };

    /*
     * Invoked when connection is resumed.
     */
    auto OnConnectionResumed = [&](Mqtt::MqttConnection &, int returnCode, bool) {
        {
            LOGM_INFO(TAG, "MQTT connection resumed with return code: %d", returnCode);
        }
    };

    connection->OnConnectionCompleted = move(onConnectionCompleted);
    connection->OnDisconnect = move(onDisconnect);
    connection->OnConnectionInterrupted = move(OnConnectionInterrupted);
    connection->OnConnectionResumed = move(OnConnectionResumed);

    LOGM_INFO(TAG, "Establishing MQTT connection with client id %s...", config.thingName->c_str());
    if (!connection->SetReconnectTimeout(15, 240))
    {
        LOG_ERROR(TAG, "Device Client is not able to set reconnection settings. Device Client will retry again.");
        return RETRY;
    }
    if (!connection->Connect(config.thingName->c_str(), false))
    {
        LOGM_ERROR(TAG, "MQTT Connection failed with error: %s", ErrorDebugString(connection->LastError()));
        return RETRY;
    }

    int connectionStatus = connectionCompletedPromise.get_future().get();

    if (SharedCrtResourceManager::SUCCESS == connectionStatus)
    {
        LOG_INFO(TAG, "Shared MQTT connection is ready!");
        return SharedCrtResourceManager::SUCCESS;
    }
    else
    {
        LOG_ERROR(TAG, "Failed to establish shared MQTT connection, but will attempt retry...");
        return SharedCrtResourceManager::RETRY;
    }
}