in source/SharedCrtResourceManager.cpp [225:350]
int SharedCrtResourceManager::establishConnection(const PlainConfig &config)
{
if (!locateCredentials(config))
{
LOGM_ERROR(
TAG,
"*** %s: Failed to find file(s) with correct permissions required for establishing the MQTT connection ***",
DeviceClient::DC_FATAL_ERROR);
return SharedCrtResourceManager::ABORT;
}
auto clientConfigBuilder = MqttClientConnectionConfigBuilder(config.cert->c_str(), config.key->c_str());
clientConfigBuilder.WithEndpoint(config.endpoint->c_str());
clientConfigBuilder.WithCertificateAuthority(config.rootCa->c_str());
clientConfigBuilder.WithSdkName(SharedCrtResourceManager::BINARY_NAME);
clientConfigBuilder.WithSdkVersion(DEVICE_CLIENT_VERSION);
auto clientConfig = clientConfigBuilder.Build();
if (!clientConfig)
{
LOGM_ERROR(
TAG,
"MQTT Client Configuration initialization failed with error: %s",
ErrorDebugString(clientConfig.LastError()));
return ABORT;
}
connection = mqttClient->NewConnection(clientConfig);
if (!*connection)
{
LOGM_ERROR(TAG, "MQTT Connection Creation failed with error: %s", ErrorDebugString(connection->LastError()));
return ABORT;
}
promise<int> connectionCompletedPromise;
/*
* This will execute when an mqtt connect has completed or failed.
*/
auto onConnectionCompleted = [&](Mqtt::MqttConnection &, int errorCode, Mqtt::ReturnCode returnCode, bool) {
if (errorCode)
{
LOGM_ERROR(TAG, "MQTT Connection failed with error: %s", ErrorDebugString(errorCode));
if (AWS_ERROR_MQTT_UNEXPECTED_HANGUP == errorCode)
{
LOG_ERROR(
TAG,
"*** Did you make sure you are using valid certificate with recommended policy attached to it? "
"Please refer README->Fleet Provisioning Feature section for more details on recommended policies "
"for AWS IoT Device Client. ***");
}
connectionCompletedPromise.set_value(errorCode);
}
else
{
LOGM_INFO(TAG, "MQTT connection established with return code: %d", returnCode);
connectionCompletedPromise.set_value(0);
}
};
/*
* Invoked when a disconnect message has completed.
*/
auto onDisconnect = [&](Mqtt::MqttConnection & /*conn*/) {
{
LOG_INFO(TAG, "MQTT Connection is now disconnected");
connectionClosedPromise.set_value();
}
};
/*
* Invoked when connection is interrupted.
*/
auto OnConnectionInterrupted = [&](Mqtt::MqttConnection &, int errorCode) {
{
if (errorCode)
{
LOGM_ERROR(
TAG,
"MQTT Connection interrupted with error: `%s`. Device Client will retry connection until it is "
"successfully connected to the core. ",
ErrorDebugString(errorCode));
}
}
};
/*
* Invoked when connection is resumed.
*/
auto OnConnectionResumed = [&](Mqtt::MqttConnection &, int returnCode, bool) {
{
LOGM_INFO(TAG, "MQTT connection resumed with return code: %d", returnCode);
}
};
connection->OnConnectionCompleted = move(onConnectionCompleted);
connection->OnDisconnect = move(onDisconnect);
connection->OnConnectionInterrupted = move(OnConnectionInterrupted);
connection->OnConnectionResumed = move(OnConnectionResumed);
LOGM_INFO(TAG, "Establishing MQTT connection with client id %s...", config.thingName->c_str());
if (!connection->SetReconnectTimeout(15, 240))
{
LOG_ERROR(TAG, "Device Client is not able to set reconnection settings. Device Client will retry again.");
return RETRY;
}
if (!connection->Connect(config.thingName->c_str(), false))
{
LOGM_ERROR(TAG, "MQTT Connection failed with error: %s", ErrorDebugString(connection->LastError()));
return RETRY;
}
int connectionStatus = connectionCompletedPromise.get_future().get();
if (SharedCrtResourceManager::SUCCESS == connectionStatus)
{
LOG_INFO(TAG, "Shared MQTT connection is ready!");
return SharedCrtResourceManager::SUCCESS;
}
else
{
LOG_ERROR(TAG, "Failed to establish shared MQTT connection, but will attempt retry...");
return SharedCrtResourceManager::RETRY;
}
}