in provisioning_client/src/prov_transport_amqp_common.c [1453:1573]
void prov_transport_common_amqp_dowork(PROV_DEVICE_TRANSPORT_HANDLE handle)
{
if (handle != NULL)
{
PROV_TRANSPORT_AMQP_INFO* amqp_info = (PROV_TRANSPORT_AMQP_INFO*)handle;
if (amqp_info->amqp_state == AMQP_STATE_DISCONNECTED)
{
if (create_connection(amqp_info) != 0)
{
LogError("unable to create amqp connection");
amqp_info->amqp_state = AMQP_STATE_ERROR;
amqp_info->transport_state = TRANSPORT_CLIENT_STATE_ERROR;
}
else
{
amqp_info->amqp_state = AMQP_STATE_CONNECTING;
}
}
else if (amqp_info->amqp_state != AMQP_STATE_IDLE)
{
connection_dowork(amqp_info->connection);
if (amqp_info->amqp_state == AMQP_STATE_CONNECTED || amqp_info->amqp_state == AMQP_STATE_ERROR)
{
switch (amqp_info->transport_state)
{
case TRANSPORT_CLIENT_STATE_REG_SEND:
if (send_amqp_message(amqp_info, AMQP_REGISTER_ME) != 0)
{
amqp_info->transport_state = TRANSPORT_CLIENT_STATE_ERROR;
amqp_info->amqp_state = AMQP_STATE_ERROR;
}
else
{
amqp_info->transport_state = TRANSPORT_CLIENT_STATE_REG_SENT;
}
break;
case TRANSPORT_CLIENT_STATE_REG_RECV:
case TRANSPORT_CLIENT_STATE_STATUS_RECV:
{
PROV_JSON_INFO* parse_info = amqp_info->json_parse_cb(amqp_info->payload_data, amqp_info->json_ctx);
if (parse_info == NULL)
{
LogError("Unable to process registration reply.");
amqp_info->transport_state = TRANSPORT_CLIENT_STATE_ERROR;
amqp_info->amqp_state = AMQP_STATE_ERROR;
}
else
{
amqp_info->transport_state = TRANSPORT_CLIENT_STATE_IDLE;
switch (parse_info->prov_status)
{
case PROV_DEVICE_TRANSPORT_STATUS_UNASSIGNED:
case PROV_DEVICE_TRANSPORT_STATUS_ASSIGNING:
if (parse_info->operation_id == NULL)
{
LogError("Failure operation Id invalid");
amqp_info->transport_state = TRANSPORT_CLIENT_STATE_ERROR;
amqp_info->amqp_state = AMQP_STATE_ERROR;
}
else if (amqp_info->operation_id == NULL && mallocAndStrcpy_s(&amqp_info->operation_id, parse_info->operation_id) != 0)
{
LogError("Failure copying operation Id");
amqp_info->transport_state = TRANSPORT_CLIENT_STATE_ERROR;
amqp_info->amqp_state = AMQP_STATE_ERROR;
}
else
{
if (amqp_info->status_cb != NULL)
{
amqp_info->status_cb(parse_info->prov_status, amqp_info->retry_after_value, amqp_info->status_ctx);
}
}
break;
case PROV_DEVICE_TRANSPORT_STATUS_ASSIGNED:
amqp_info->register_data_cb(PROV_DEVICE_TRANSPORT_RESULT_OK, parse_info->authorization_key, parse_info->iothub_uri, parse_info->device_id, amqp_info->user_ctx);
amqp_info->transport_state = TRANSPORT_CLIENT_STATE_IDLE;
break;
case PROV_DEVICE_TRANSPORT_STATUS_ERROR:
default:
LogError("Unable to process message reply.");
amqp_info->transport_state = TRANSPORT_CLIENT_STATE_ERROR;
amqp_info->amqp_state = AMQP_STATE_ERROR;
break;
}
free_json_parse_info(parse_info);
}
break;
}
case TRANSPORT_CLIENT_STATE_STATUS_SEND:
if (send_amqp_message(amqp_info, AMQP_OPERATION_STATUS) != 0)
{
amqp_info->transport_state = TRANSPORT_CLIENT_STATE_ERROR;
amqp_info->amqp_state = AMQP_STATE_ERROR;
}
else
{
amqp_info->transport_state = TRANSPORT_CLIENT_STATE_STATUS_SENT;
}
break;
case TRANSPORT_CLIENT_STATE_ERROR:
amqp_info->register_data_cb(PROV_DEVICE_TRANSPORT_RESULT_ERROR, NULL, NULL, NULL, amqp_info->user_ctx);
amqp_info->transport_state = TRANSPORT_CLIENT_STATE_IDLE;
amqp_info->amqp_state = AMQP_STATE_IDLE;
break;
case TRANSPORT_CLIENT_STATE_REG_SENT:
case TRANSPORT_CLIENT_STATE_STATUS_SENT:
// Check timout
break;
case TRANSPORT_CLIENT_STATE_IDLE:
default:
break;
}
}
}
}
}