void prov_transport_common_amqp_dowork()

in provisioning_client/src/prov_transport_amqp_common.c [1453:1573]


void prov_transport_common_amqp_dowork(PROV_DEVICE_TRANSPORT_HANDLE handle)
{
    if (handle != NULL)
    {
        PROV_TRANSPORT_AMQP_INFO* amqp_info = (PROV_TRANSPORT_AMQP_INFO*)handle;
        if (amqp_info->amqp_state == AMQP_STATE_DISCONNECTED)
        {
            if (create_connection(amqp_info) != 0)
            {
                LogError("unable to create amqp connection");
                amqp_info->amqp_state = AMQP_STATE_ERROR;
                amqp_info->transport_state = TRANSPORT_CLIENT_STATE_ERROR;
            }
            else
            {
                amqp_info->amqp_state = AMQP_STATE_CONNECTING;
            }
        }
        else if (amqp_info->amqp_state != AMQP_STATE_IDLE)
        {
            connection_dowork(amqp_info->connection);
            if (amqp_info->amqp_state == AMQP_STATE_CONNECTED || amqp_info->amqp_state == AMQP_STATE_ERROR)
            {
                switch (amqp_info->transport_state)
                {
                case TRANSPORT_CLIENT_STATE_REG_SEND:
                    if (send_amqp_message(amqp_info, AMQP_REGISTER_ME) != 0)
                    {
                        amqp_info->transport_state = TRANSPORT_CLIENT_STATE_ERROR;
                        amqp_info->amqp_state = AMQP_STATE_ERROR;
                    }
                    else
                    {
                        amqp_info->transport_state = TRANSPORT_CLIENT_STATE_REG_SENT;
                    }
                    break;

                case TRANSPORT_CLIENT_STATE_REG_RECV:
                case TRANSPORT_CLIENT_STATE_STATUS_RECV:
                {
                    PROV_JSON_INFO* parse_info = amqp_info->json_parse_cb(amqp_info->payload_data, amqp_info->json_ctx);
                    if (parse_info == NULL)
                    {
                        LogError("Unable to process registration reply.");
                        amqp_info->transport_state = TRANSPORT_CLIENT_STATE_ERROR;
                        amqp_info->amqp_state = AMQP_STATE_ERROR;
                    }
                    else
                    {
                        amqp_info->transport_state = TRANSPORT_CLIENT_STATE_IDLE;
                        switch (parse_info->prov_status)
                        {
                            case PROV_DEVICE_TRANSPORT_STATUS_UNASSIGNED:
                            case PROV_DEVICE_TRANSPORT_STATUS_ASSIGNING:
                                if (parse_info->operation_id == NULL)
                                {
                                    LogError("Failure operation Id invalid");
                                    amqp_info->transport_state = TRANSPORT_CLIENT_STATE_ERROR;
                                    amqp_info->amqp_state = AMQP_STATE_ERROR;
                                }
                                else if (amqp_info->operation_id == NULL && mallocAndStrcpy_s(&amqp_info->operation_id, parse_info->operation_id) != 0)
                                {
                                    LogError("Failure copying operation Id");
                                    amqp_info->transport_state = TRANSPORT_CLIENT_STATE_ERROR;
                                    amqp_info->amqp_state = AMQP_STATE_ERROR;
                                }
                                else
                                {
                                    if (amqp_info->status_cb != NULL)
                                    {
                                        amqp_info->status_cb(parse_info->prov_status, amqp_info->retry_after_value, amqp_info->status_ctx);
                                    }
                                }
                                break;

                            case PROV_DEVICE_TRANSPORT_STATUS_ASSIGNED:
                                amqp_info->register_data_cb(PROV_DEVICE_TRANSPORT_RESULT_OK, parse_info->authorization_key, parse_info->iothub_uri, parse_info->device_id, amqp_info->user_ctx);
                                amqp_info->transport_state = TRANSPORT_CLIENT_STATE_IDLE;
                                break;
                            case PROV_DEVICE_TRANSPORT_STATUS_ERROR:
                            default:
                                LogError("Unable to process message reply.");
                                amqp_info->transport_state = TRANSPORT_CLIENT_STATE_ERROR;
                                amqp_info->amqp_state = AMQP_STATE_ERROR;
                                break;

                        }
                        free_json_parse_info(parse_info);
                    }
                    break;
                }

                case TRANSPORT_CLIENT_STATE_STATUS_SEND:
                    if (send_amqp_message(amqp_info, AMQP_OPERATION_STATUS) != 0)
                    {
                        amqp_info->transport_state = TRANSPORT_CLIENT_STATE_ERROR;
                        amqp_info->amqp_state = AMQP_STATE_ERROR;
                    }
                    else
                    {
                        amqp_info->transport_state = TRANSPORT_CLIENT_STATE_STATUS_SENT;
                    }
                    break;

                case TRANSPORT_CLIENT_STATE_ERROR:
                    amqp_info->register_data_cb(PROV_DEVICE_TRANSPORT_RESULT_ERROR, NULL, NULL, NULL, amqp_info->user_ctx);
                    amqp_info->transport_state = TRANSPORT_CLIENT_STATE_IDLE;
                    amqp_info->amqp_state = AMQP_STATE_IDLE;
                    break;
                case TRANSPORT_CLIENT_STATE_REG_SENT:
                case TRANSPORT_CLIENT_STATE_STATUS_SENT:
                    // Check timout
                    break;
                case TRANSPORT_CLIENT_STATE_IDLE:
                default:
                    break;
                }
            }
        }
    }
}