void prov_transport_common_mqtt_dowork()

in provisioning_client/src/prov_transport_mqtt_common.c [910:1052]


void prov_transport_common_mqtt_dowork(PROV_DEVICE_TRANSPORT_HANDLE handle)
{
    if (handle != NULL)
    {
        PROV_TRANSPORT_MQTT_INFO* mqtt_info = (PROV_TRANSPORT_MQTT_INFO*)handle;
        if (mqtt_info->mqtt_state == MQTT_STATE_DISCONNECTED)
        {
            if (create_connection(mqtt_info) != 0)
            {
                LogError("unable to create mqtt connection");
                mqtt_info->mqtt_state = MQTT_STATE_ERROR;
                mqtt_info->transport_state = TRANSPORT_CLIENT_STATE_ERROR;
            }
            else
            {
                mqtt_info->mqtt_state = MQTT_STATE_CONNECTING;
            }
        }
        else if (mqtt_info->mqtt_state == MQTT_STATE_CONNECTED)
        {
            mqtt_client_dowork(mqtt_info->mqtt_client);
            if (subscribe_to_topic(mqtt_info) != 0)
            {
                LogError("Failure subscribing to topic");
                mqtt_info->mqtt_state = MQTT_STATE_ERROR;
                mqtt_info->transport_state = TRANSPORT_CLIENT_STATE_ERROR;
            }
            else
            {
                mqtt_info->status_cb(PROV_DEVICE_TRANSPORT_STATUS_CONNECTED, mqtt_info->retry_after_value, mqtt_info->status_ctx);
                mqtt_info->mqtt_state = MQTT_STATE_SUBSCRIBING;
            }
        }
        else if (mqtt_info->mqtt_state != MQTT_STATE_IDLE)
        {
            mqtt_client_dowork(mqtt_info->mqtt_client);
            if (mqtt_info->mqtt_state == MQTT_STATE_SUBSCRIBED || mqtt_info->mqtt_state == MQTT_STATE_ERROR)
            {
                switch (mqtt_info->transport_state)
                {
                    case TRANSPORT_CLIENT_STATE_REG_SEND:
                        if (send_register_message(mqtt_info) != 0)
                        {
                            mqtt_info->transport_state = TRANSPORT_CLIENT_STATE_ERROR;
                            mqtt_info->mqtt_state = MQTT_STATE_ERROR;
                        }
                        else
                        {
                            mqtt_info->transport_state = TRANSPORT_CLIENT_STATE_REG_SENT;
                        }
                        break;
                    case TRANSPORT_CLIENT_STATE_STATUS_SEND:
                        if (send_operation_status_message(mqtt_info) != 0)
                        {
                            mqtt_info->transport_state = TRANSPORT_CLIENT_STATE_ERROR;
                            mqtt_info->mqtt_state = MQTT_STATE_ERROR;
                        }
                        else
                        {
                            mqtt_info->transport_state = TRANSPORT_CLIENT_STATE_STATUS_SENT;
                        }
                        break;

                    case TRANSPORT_CLIENT_STATE_REG_RECV:
                    case TRANSPORT_CLIENT_STATE_STATUS_RECV:
                    {
                        PROV_JSON_INFO* parse_info = mqtt_info->json_parse_cb(mqtt_info->payload_data, mqtt_info->json_ctx);
                        if (parse_info == NULL)
                        {
                            LogError("Unable to process registration reply.");
                            mqtt_info->transport_state = TRANSPORT_CLIENT_STATE_ERROR;
                            mqtt_info->mqtt_state = MQTT_STATE_ERROR;
                        }
                        else
                        {
                            switch (parse_info->prov_status)
                            {
                                case PROV_DEVICE_TRANSPORT_STATUS_UNASSIGNED:
                                case PROV_DEVICE_TRANSPORT_STATUS_ASSIGNING:
                                    if (parse_info->operation_id == NULL)
                                    {
                                        LogError("Failure operation Id invalid");
                                        mqtt_info->transport_state = TRANSPORT_CLIENT_STATE_ERROR;
                                        mqtt_info->mqtt_state = MQTT_STATE_ERROR;
                                    }
                                    else if (mqtt_info->operation_id == NULL && mallocAndStrcpy_s(&mqtt_info->operation_id, parse_info->operation_id) != 0)
                                    {
                                        LogError("Failure copying operation Id");
                                        mqtt_info->transport_state = TRANSPORT_CLIENT_STATE_ERROR;
                                        mqtt_info->mqtt_state = MQTT_STATE_ERROR;
                                    }
                                    else
                                    {
                                        if (mqtt_info->status_cb != NULL)
                                        {
                                            mqtt_info->status_cb(parse_info->prov_status, mqtt_info->retry_after_value, mqtt_info->status_ctx);
                                        }
                                        mqtt_info->transport_state = TRANSPORT_CLIENT_STATE_IDLE;
                                    }
                                    break;
                                case PROV_DEVICE_TRANSPORT_STATUS_ASSIGNED:
                                    mqtt_info->register_data_cb(PROV_DEVICE_TRANSPORT_RESULT_OK, parse_info->authorization_key, parse_info->iothub_uri, parse_info->device_id, mqtt_info->user_ctx);
                                    mqtt_info->transport_state = TRANSPORT_CLIENT_STATE_IDLE;
                                    break;
                                case PROV_DEVICE_TRANSPORT_STATUS_TRANSIENT:
                                    break;
                                case PROV_DEVICE_TRANSPORT_STATUS_ERROR:
                                default:
                                    LogError("Unable to process message reply.");
                                    mqtt_info->transport_state = TRANSPORT_CLIENT_STATE_ERROR;
                                    mqtt_info->mqtt_state = MQTT_STATE_ERROR;
                                    break;

                            }
                            free_json_parse_info(parse_info);
                        }
                        break;
                    }
                    case TRANSPORT_CLIENT_STATE_TRANSIENT:
                        if (mqtt_info->status_cb != NULL)
                        {
                            mqtt_info->status_cb(PROV_DEVICE_TRANSPORT_STATUS_TRANSIENT, mqtt_info->retry_after_value, mqtt_info->status_ctx);
                        }
                        mqtt_info->transport_state = TRANSPORT_CLIENT_STATE_IDLE;
                        break;

                    case TRANSPORT_CLIENT_STATE_ERROR:
                        mqtt_info->register_data_cb(PROV_DEVICE_TRANSPORT_RESULT_ERROR, NULL, NULL, NULL, mqtt_info->user_ctx);
                        mqtt_info->transport_state = TRANSPORT_CLIENT_STATE_IDLE;
                        mqtt_info->mqtt_state = MQTT_STATE_IDLE;
                        break;
                    case TRANSPORT_CLIENT_STATE_REG_SENT:
                    case TRANSPORT_CLIENT_STATE_STATUS_SENT:
                        break;

                    case TRANSPORT_CLIENT_STATE_IDLE:
                    default:
                        break;
                }
            }
        }
    }
}