in provisioning_client/src/prov_transport_mqtt_common.c [910:1052]
void prov_transport_common_mqtt_dowork(PROV_DEVICE_TRANSPORT_HANDLE handle)
{
if (handle != NULL)
{
PROV_TRANSPORT_MQTT_INFO* mqtt_info = (PROV_TRANSPORT_MQTT_INFO*)handle;
if (mqtt_info->mqtt_state == MQTT_STATE_DISCONNECTED)
{
if (create_connection(mqtt_info) != 0)
{
LogError("unable to create mqtt connection");
mqtt_info->mqtt_state = MQTT_STATE_ERROR;
mqtt_info->transport_state = TRANSPORT_CLIENT_STATE_ERROR;
}
else
{
mqtt_info->mqtt_state = MQTT_STATE_CONNECTING;
}
}
else if (mqtt_info->mqtt_state == MQTT_STATE_CONNECTED)
{
mqtt_client_dowork(mqtt_info->mqtt_client);
if (subscribe_to_topic(mqtt_info) != 0)
{
LogError("Failure subscribing to topic");
mqtt_info->mqtt_state = MQTT_STATE_ERROR;
mqtt_info->transport_state = TRANSPORT_CLIENT_STATE_ERROR;
}
else
{
mqtt_info->status_cb(PROV_DEVICE_TRANSPORT_STATUS_CONNECTED, mqtt_info->retry_after_value, mqtt_info->status_ctx);
mqtt_info->mqtt_state = MQTT_STATE_SUBSCRIBING;
}
}
else if (mqtt_info->mqtt_state != MQTT_STATE_IDLE)
{
mqtt_client_dowork(mqtt_info->mqtt_client);
if (mqtt_info->mqtt_state == MQTT_STATE_SUBSCRIBED || mqtt_info->mqtt_state == MQTT_STATE_ERROR)
{
switch (mqtt_info->transport_state)
{
case TRANSPORT_CLIENT_STATE_REG_SEND:
if (send_register_message(mqtt_info) != 0)
{
mqtt_info->transport_state = TRANSPORT_CLIENT_STATE_ERROR;
mqtt_info->mqtt_state = MQTT_STATE_ERROR;
}
else
{
mqtt_info->transport_state = TRANSPORT_CLIENT_STATE_REG_SENT;
}
break;
case TRANSPORT_CLIENT_STATE_STATUS_SEND:
if (send_operation_status_message(mqtt_info) != 0)
{
mqtt_info->transport_state = TRANSPORT_CLIENT_STATE_ERROR;
mqtt_info->mqtt_state = MQTT_STATE_ERROR;
}
else
{
mqtt_info->transport_state = TRANSPORT_CLIENT_STATE_STATUS_SENT;
}
break;
case TRANSPORT_CLIENT_STATE_REG_RECV:
case TRANSPORT_CLIENT_STATE_STATUS_RECV:
{
PROV_JSON_INFO* parse_info = mqtt_info->json_parse_cb(mqtt_info->payload_data, mqtt_info->json_ctx);
if (parse_info == NULL)
{
LogError("Unable to process registration reply.");
mqtt_info->transport_state = TRANSPORT_CLIENT_STATE_ERROR;
mqtt_info->mqtt_state = MQTT_STATE_ERROR;
}
else
{
switch (parse_info->prov_status)
{
case PROV_DEVICE_TRANSPORT_STATUS_UNASSIGNED:
case PROV_DEVICE_TRANSPORT_STATUS_ASSIGNING:
if (parse_info->operation_id == NULL)
{
LogError("Failure operation Id invalid");
mqtt_info->transport_state = TRANSPORT_CLIENT_STATE_ERROR;
mqtt_info->mqtt_state = MQTT_STATE_ERROR;
}
else if (mqtt_info->operation_id == NULL && mallocAndStrcpy_s(&mqtt_info->operation_id, parse_info->operation_id) != 0)
{
LogError("Failure copying operation Id");
mqtt_info->transport_state = TRANSPORT_CLIENT_STATE_ERROR;
mqtt_info->mqtt_state = MQTT_STATE_ERROR;
}
else
{
if (mqtt_info->status_cb != NULL)
{
mqtt_info->status_cb(parse_info->prov_status, mqtt_info->retry_after_value, mqtt_info->status_ctx);
}
mqtt_info->transport_state = TRANSPORT_CLIENT_STATE_IDLE;
}
break;
case PROV_DEVICE_TRANSPORT_STATUS_ASSIGNED:
mqtt_info->register_data_cb(PROV_DEVICE_TRANSPORT_RESULT_OK, parse_info->authorization_key, parse_info->iothub_uri, parse_info->device_id, mqtt_info->user_ctx);
mqtt_info->transport_state = TRANSPORT_CLIENT_STATE_IDLE;
break;
case PROV_DEVICE_TRANSPORT_STATUS_TRANSIENT:
break;
case PROV_DEVICE_TRANSPORT_STATUS_ERROR:
default:
LogError("Unable to process message reply.");
mqtt_info->transport_state = TRANSPORT_CLIENT_STATE_ERROR;
mqtt_info->mqtt_state = MQTT_STATE_ERROR;
break;
}
free_json_parse_info(parse_info);
}
break;
}
case TRANSPORT_CLIENT_STATE_TRANSIENT:
if (mqtt_info->status_cb != NULL)
{
mqtt_info->status_cb(PROV_DEVICE_TRANSPORT_STATUS_TRANSIENT, mqtt_info->retry_after_value, mqtt_info->status_ctx);
}
mqtt_info->transport_state = TRANSPORT_CLIENT_STATE_IDLE;
break;
case TRANSPORT_CLIENT_STATE_ERROR:
mqtt_info->register_data_cb(PROV_DEVICE_TRANSPORT_RESULT_ERROR, NULL, NULL, NULL, mqtt_info->user_ctx);
mqtt_info->transport_state = TRANSPORT_CLIENT_STATE_IDLE;
mqtt_info->mqtt_state = MQTT_STATE_IDLE;
break;
case TRANSPORT_CLIENT_STATE_REG_SENT:
case TRANSPORT_CLIENT_STATE_STATUS_SENT:
break;
case TRANSPORT_CLIENT_STATE_IDLE:
default:
break;
}
}
}
}
}