in iothub_client/src/iothubtransport_mqtt_common.c [2797:2919]
static int UpdateMqttConnectionStateIfNeeded(PMQTTTRANSPORT_HANDLE_DATA transport_data)
{
int result = 0;
// Make sure we're not destroying the object
if (!transport_data->isDestroyCalled)
{
RETRY_ACTION retry_action = RETRY_ACTION_RETRY_LATER;
if (transport_data->mqttClientStatus == MQTT_CLIENT_STATUS_NOT_CONNECTED && transport_data->isRecoverableError)
{
// Note: in case retry_control_should_retry fails, the reconnection shall be attempted anyway (defaulting to policy IOTHUB_CLIENT_RETRY_IMMEDIATE).
if (!transport_data->conn_attempted || retry_control_should_retry(transport_data->retry_control_handle, &retry_action) != 0 || retry_action == RETRY_ACTION_RETRY_NOW)
{
if (tickcounter_get_current_ms(transport_data->msgTickCounter, &transport_data->connectTick) != 0)
{
transport_data->connectFailCount++;
result = MU_FAILURE;
}
else
{
ResetConnectionIfNecessary(transport_data);
if (SendMqttConnectMsg(transport_data) != 0)
{
transport_data->connectFailCount++;
result = MU_FAILURE;
}
else
{
transport_data->mqttClientStatus = MQTT_CLIENT_STATUS_CONNECTING;
transport_data->connectFailCount = 0;
result = 0;
}
}
}
else if (retry_action == RETRY_ACTION_STOP_RETRYING)
{
// send callback if retry expired
if (!transport_data->isRetryExpiredCallbackCalled)
{
transport_data->transport_callbacks.connection_status_cb(IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_RETRY_EXPIRED, transport_data->transport_ctx);
transport_data->isRetryExpiredCallbackCalled = true;
}
result = MU_FAILURE;
}
else if (retry_action == RETRY_ACTION_RETRY_LATER)
{
result = MU_FAILURE;
}
}
else if (transport_data->mqttClientStatus == MQTT_CLIENT_STATUS_EXECUTE_DISCONNECT)
{
// Need to disconnect from client
DisconnectFromClient(transport_data);
result = 0;
}
else if (transport_data->mqttClientStatus == MQTT_CLIENT_STATUS_CONNECTING)
{
tickcounter_ms_t current_time;
if (tickcounter_get_current_ms(transport_data->msgTickCounter, ¤t_time) != 0)
{
LogError("failed verifying MQTT_CLIENT_STATUS_CONNECTING timeout");
result = MU_FAILURE;
}
else if ((current_time - transport_data->mqtt_connect_time) / 1000 > transport_data->connect_timeout_in_sec)
{
LogError("mqtt_client timed out waiting for CONNACK: disconnecting MQTT connection");
transport_data->currPacketState = PACKET_TYPE_ERROR;
DisconnectFromClient(transport_data);
result = MU_FAILURE;
}
}
else if (transport_data->mqttClientStatus == MQTT_CLIENT_STATUS_CONNECTED)
{
// We are connected and not being closed, so does SAS need to reconnect?
tickcounter_ms_t current_time;
if (tickcounter_get_current_ms(transport_data->msgTickCounter, ¤t_time) != 0)
{
transport_data->connectFailCount++;
result = MU_FAILURE;
}
else
{
IOTHUB_CREDENTIAL_TYPE cred_type = IoTHubClient_Auth_Get_Credential_Type(transport_data->authorization_module);
// If the credential type is not an x509 certificate then we shall renew the Sas_Token
if (cred_type != IOTHUB_CREDENTIAL_TYPE_X509 && cred_type != IOTHUB_CREDENTIAL_TYPE_X509_ECC)
{
uint64_t sas_token_expiry = IoTHubClient_Auth_Get_SasToken_Expiry(transport_data->authorization_module);
if ((current_time - transport_data->mqtt_connect_time) / 1000 > (sas_token_expiry*SAS_REFRESH_MULTIPLIER))
{
LogInfo("Disconnecting MQTT connection because SAS token expired.");
DisconnectFromClient(transport_data);
transport_data->transport_callbacks.connection_status_cb(IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_EXPIRED_SAS_TOKEN, transport_data->transport_ctx);
transport_data->currPacketState = UNKNOWN_TYPE;
if (transport_data->topic_MqttMessage != NULL)
{
transport_data->topics_ToSubscribe |= SUBSCRIBE_TELEMETRY_TOPIC;
}
if (transport_data->topic_GetState != NULL)
{
transport_data->topics_ToSubscribe |= SUBSCRIBE_GET_REPORTED_STATE_TOPIC;
}
if (transport_data->topic_NotifyState != NULL)
{
transport_data->topics_ToSubscribe |= SUBSCRIBE_NOTIFICATION_STATE_TOPIC;
}
if (transport_data->topic_DeviceMethods != NULL)
{
transport_data->topics_ToSubscribe |= SUBSCRIBE_DEVICE_METHOD_TOPIC;
}
if (transport_data->topic_InputQueue != NULL)
{
transport_data->topics_ToSubscribe |= SUBSCRIBE_INPUT_QUEUE_TOPIC;
}
}
}
}
}
}
return result;
}