in src/mqtt_client.c [412:470]
static void onOpenComplete(void* context, IO_OPEN_RESULT open_result)
{
MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)context;
if (mqtt_client != NULL)
{
if (open_result == IO_OPEN_OK && !(mqtt_client->mqtt_status & MQTT_STATUS_SOCKET_CONNECTED))
{
mqtt_client->packetState = CONNECT_TYPE;
mqtt_client->mqtt_status |= MQTT_STATUS_SOCKET_CONNECTED;
STRING_HANDLE trace_log = construct_trace_log_handle(mqtt_client);
// Send the Connect packet
BUFFER_HANDLE connPacket = mqtt_codec_connect(&mqtt_client->mqttOptions, trace_log);
if (connPacket == NULL)
{
LogError("Error: mqtt_codec_connect failed");
}
else
{
size_t size = BUFFER_length(connPacket);
/*Codes_SRS_MQTT_CLIENT_07_009: [On success mqtt_client_connect shall send the MQTT CONNECT to the endpoint.]*/
if (sendPacketItem(mqtt_client, BUFFER_u_char(connPacket), size) != 0)
{
LogError("Error: failure sending connect packet");
// Set the status to pending close because we connot continue
// with CONN failing to send
if (mqtt_client->fnOnErrorCallBack)
{
mqtt_client->fnOnErrorCallBack(mqtt_client, MQTT_CLIENT_CONNECTION_ERROR, mqtt_client->errorCBCtx);
}
mqtt_client->mqtt_status |= MQTT_STATUS_PENDING_CLOSE;
}
else
{
log_outgoing_trace(mqtt_client, trace_log);
}
BUFFER_delete(connPacket);
}
if (trace_log != NULL)
{
STRING_delete(trace_log);
}
}
else
{
LogError("Error: failure opening connection to endpoint");
if (!(mqtt_client->mqtt_status & MQTT_STATUS_SOCKET_CONNECTED) && mqtt_client->fnOnErrorCallBack)
{
mqtt_client->fnOnErrorCallBack(mqtt_client, MQTT_CLIENT_CONNECTION_ERROR, mqtt_client->errorCBCtx);
}
mqtt_client->mqtt_status |= MQTT_STATUS_PENDING_CLOSE;
}
}
else
{
LogError("Error: mqtt_client is NULL");
}
}