in iothub_client/src/iothubtransport_amqp_device.c [934:1145]
void amqp_device_do_work(AMQP_DEVICE_HANDLE handle)
{
if (handle == NULL)
{
LogError("Failed to perform amqp_device_do_work (handle is NULL)");
}
else
{
// Cranking the state monster:
AMQP_DEVICE_INSTANCE* instance = (AMQP_DEVICE_INSTANCE*)handle;
if (instance->state == DEVICE_STATE_STOPPING)
{
tickcounter_ms_t current_time_ms;
if (tickcounter_get_current_ms(instance->tick_counter_handle, ¤t_time_ms) != 0)
{
LogError("Failed stopping device '%s' (could not get tickcounter time)", instance->config->device_id);
update_state(instance, DEVICE_STATE_ERROR_MSG);
}
else if ((current_time_ms - instance->last_stop_request_time) > instance->stop_delay_ms)
{
if (internal_device_stop(instance) != 0)
{
LogError("Failed stopping device '%s'", instance->config->device_id);
update_state(instance, DEVICE_STATE_ERROR_MSG);
}
}
}
else if (instance->state == DEVICE_STATE_STARTING)
{
if (instance->config->authentication_mode == DEVICE_AUTH_MODE_CBS)
{
if (instance->auth_state == AUTHENTICATION_STATE_STOPPED)
{
if (authentication_start(instance->authentication_handle, instance->cbs_handle) != RESULT_OK)
{
LogError("Device '%s' failed to be authenticated (authentication_start failed)", instance->config->device_id);
update_state(instance, DEVICE_STATE_ERROR_AUTH);
}
}
else if (instance->auth_state == AUTHENTICATION_STATE_STARTING)
{
int is_timed_out;
if (is_timeout_reached(instance->auth_state_last_changed_time, instance->auth_state_change_timeout_secs, &is_timed_out) != RESULT_OK)
{
LogError("Device '%s' failed verifying the timeout for authentication start (is_timeout_reached failed)", instance->config->device_id);
update_state(instance, DEVICE_STATE_ERROR_AUTH);
}
else if (is_timed_out == 1)
{
LogError("Device '%s' authentication did not complete starting within expected timeout (%lu)", instance->config->device_id, (unsigned long)instance->auth_state_change_timeout_secs);
update_state(instance, DEVICE_STATE_ERROR_AUTH_TIMEOUT);
}
}
else if (instance->auth_state == AUTHENTICATION_STATE_ERROR)
{
if (instance->auth_error_code == AUTHENTICATION_ERROR_AUTH_FAILED)
{
update_state(instance, DEVICE_STATE_ERROR_AUTH);
}
else // DEVICE_STATE_ERROR_TIMEOUT
{
update_state(instance, DEVICE_STATE_ERROR_AUTH_TIMEOUT);
}
}
// There is no AUTHENTICATION_STATE_STOPPING
}
if (instance->config->authentication_mode == DEVICE_AUTH_MODE_X509 || instance->auth_state == AUTHENTICATION_STATE_STARTED)
{
size_t number_of_messengers_started = 0;
if (instance->msgr_state == TELEMETRY_MESSENGER_STATE_STOPPED)
{
if (telemetry_messenger_start(instance->messenger_handle, instance->session_handle) != RESULT_OK)
{
LogError("Device '%s' messenger failed to be started (messenger_start failed)", instance->config->device_id);
update_state(instance, DEVICE_STATE_ERROR_MSG);
}
}
else if (instance->msgr_state == TELEMETRY_MESSENGER_STATE_STARTING)
{
int is_timed_out;
if (is_timeout_reached(instance->msgr_state_last_changed_time, instance->msgr_state_change_timeout_secs, &is_timed_out) != RESULT_OK)
{
LogError("Device '%s' failed verifying the timeout for messenger start (is_timeout_reached failed)", instance->config->device_id);
update_state(instance, DEVICE_STATE_ERROR_MSG);
}
else if (is_timed_out == 1)
{
LogError("Device '%s' messenger did not complete starting within expected timeout (%lu)", instance->config->device_id, (unsigned long)instance->msgr_state_change_timeout_secs);
update_state(instance, DEVICE_STATE_ERROR_MSG);
}
}
else if (instance->msgr_state == TELEMETRY_MESSENGER_STATE_ERROR)
{
LogError("Device '%s' messenger failed to be started (messenger got into error state)", instance->config->device_id);
update_state(instance, DEVICE_STATE_ERROR_MSG);
}
else if (instance->msgr_state == TELEMETRY_MESSENGER_STATE_STARTED)
{
number_of_messengers_started++;
}
if (instance->twin_msgr_state == TWIN_MESSENGER_STATE_STOPPED)
{
if (twin_messenger_start(instance->twin_messenger_handle, instance->session_handle) != RESULT_OK)
{
LogError("Device '%s' twin messenger failed to be started (messenger_start failed)", instance->config->device_id);
update_state(instance, DEVICE_STATE_ERROR_MSG);
}
}
else if (instance->twin_msgr_state == TWIN_MESSENGER_STATE_STARTING)
{
int is_timed_out;
if (is_timeout_reached(instance->twin_msgr_state_last_changed_time, instance->twin_msgr_state_change_timeout_secs, &is_timed_out) != RESULT_OK)
{
LogError("Device '%s' failed verifying the timeout for twin messenger start (is_timeout_reached failed)", instance->config->device_id);
update_state(instance, DEVICE_STATE_ERROR_MSG);
}
else if (is_timed_out == 1)
{
LogError("Device '%s' twin messenger did not complete starting within expected timeout (%lu)", instance->config->device_id, (unsigned long)instance->twin_msgr_state_change_timeout_secs);
update_state(instance, DEVICE_STATE_ERROR_MSG);
}
}
else if (instance->twin_msgr_state == TWIN_MESSENGER_STATE_ERROR)
{
LogError("Device '%s' twin messenger failed to be started (messenger got into error state)", instance->config->device_id);
update_state(instance, DEVICE_STATE_ERROR_MSG);
}
else if (instance->twin_msgr_state == TWIN_MESSENGER_STATE_STARTED)
{
number_of_messengers_started++;
}
if (number_of_messengers_started == 2)
{
update_state(instance, DEVICE_STATE_STARTED);
}
}
}
else if (instance->state == DEVICE_STATE_STARTED)
{
if (instance->config->authentication_mode == DEVICE_AUTH_MODE_CBS &&
instance->auth_state != AUTHENTICATION_STATE_STARTED)
{
LogError("Device '%s' is started but authentication reported unexpected state %d", instance->config->device_id, instance->auth_state);
if (instance->auth_state != AUTHENTICATION_STATE_ERROR)
{
if (instance->auth_error_code == AUTHENTICATION_ERROR_AUTH_FAILED)
{
update_state(instance, DEVICE_STATE_ERROR_AUTH);
}
else // AUTHENTICATION_ERROR_AUTH_TIMEOUT
{
update_state(instance, DEVICE_STATE_ERROR_AUTH_TIMEOUT);
}
}
else
{
update_state(instance, DEVICE_STATE_ERROR_AUTH);
}
}
else
{
if (instance->msgr_state != TELEMETRY_MESSENGER_STATE_STARTED)
{
LogError("Device '%s' is started but messenger reported unexpected state %d", instance->config->device_id, instance->msgr_state);
update_state(instance, DEVICE_STATE_ERROR_MSG);
}
if (instance->twin_msgr_state != TWIN_MESSENGER_STATE_STARTED)
{
LogError("Device '%s' is started but TWIN messenger reported unexpected state %d", instance->config->device_id, instance->twin_msgr_state);
update_state(instance, DEVICE_STATE_ERROR_MSG);
}
}
}
// Invoking the do_works():
if (instance->config->authentication_mode == DEVICE_AUTH_MODE_CBS)
{
if (instance->auth_state != AUTHENTICATION_STATE_STOPPED && instance->auth_state != AUTHENTICATION_STATE_ERROR)
{
authentication_do_work(instance->authentication_handle);
}
}
if (instance->msgr_state != TELEMETRY_MESSENGER_STATE_STOPPED && instance->msgr_state != TELEMETRY_MESSENGER_STATE_ERROR)
{
telemetry_messenger_do_work(instance->messenger_handle);
}
if (instance->twin_msgr_state != TWIN_MESSENGER_STATE_STOPPED && instance->twin_msgr_state != TWIN_MESSENGER_STATE_ERROR)
{
twin_messenger_do_work(instance->twin_messenger_handle);
}
}
}