void amqp_device_do_work()

in iothub_client/src/iothubtransport_amqp_device.c [934:1145]


void amqp_device_do_work(AMQP_DEVICE_HANDLE handle)
{
    if (handle == NULL)
    {
        LogError("Failed to perform amqp_device_do_work (handle is NULL)");
    }
    else
    {
        // Cranking the state monster:
        AMQP_DEVICE_INSTANCE* instance = (AMQP_DEVICE_INSTANCE*)handle;

        if (instance->state == DEVICE_STATE_STOPPING)
        {
            tickcounter_ms_t current_time_ms;

            if (tickcounter_get_current_ms(instance->tick_counter_handle, &current_time_ms) != 0)
            {
                LogError("Failed stopping device '%s' (could not get tickcounter time)", instance->config->device_id);
                update_state(instance, DEVICE_STATE_ERROR_MSG);
            }
            else if ((current_time_ms - instance->last_stop_request_time) > instance->stop_delay_ms)
            {
                if (internal_device_stop(instance) != 0)
                {
                    LogError("Failed stopping device '%s'", instance->config->device_id);
                    update_state(instance, DEVICE_STATE_ERROR_MSG);
                }
            }
        }
        else if (instance->state == DEVICE_STATE_STARTING)
        {
            if (instance->config->authentication_mode == DEVICE_AUTH_MODE_CBS)
            {
                if (instance->auth_state == AUTHENTICATION_STATE_STOPPED)
                {
                    if (authentication_start(instance->authentication_handle, instance->cbs_handle) != RESULT_OK)
                    {
                        LogError("Device '%s' failed to be authenticated (authentication_start failed)", instance->config->device_id);

                        update_state(instance, DEVICE_STATE_ERROR_AUTH);
                    }
                }
                else if (instance->auth_state == AUTHENTICATION_STATE_STARTING)
                {
                    int is_timed_out;
                    if (is_timeout_reached(instance->auth_state_last_changed_time, instance->auth_state_change_timeout_secs, &is_timed_out) != RESULT_OK)
                    {
                        LogError("Device '%s' failed verifying the timeout for authentication start (is_timeout_reached failed)", instance->config->device_id);
                        update_state(instance, DEVICE_STATE_ERROR_AUTH);
                    }
                    else if (is_timed_out == 1)
                    {
                        LogError("Device '%s' authentication did not complete starting within expected timeout (%lu)", instance->config->device_id, (unsigned long)instance->auth_state_change_timeout_secs);

                        update_state(instance, DEVICE_STATE_ERROR_AUTH_TIMEOUT);
                    }
                }
                else if (instance->auth_state == AUTHENTICATION_STATE_ERROR)
                {
                    if (instance->auth_error_code == AUTHENTICATION_ERROR_AUTH_FAILED)
                    {
                        update_state(instance, DEVICE_STATE_ERROR_AUTH);
                    }
                    else // DEVICE_STATE_ERROR_TIMEOUT
                    {
                        update_state(instance, DEVICE_STATE_ERROR_AUTH_TIMEOUT);
                    }
                }
                // There is no AUTHENTICATION_STATE_STOPPING
            }

            if (instance->config->authentication_mode == DEVICE_AUTH_MODE_X509 || instance->auth_state == AUTHENTICATION_STATE_STARTED)
            {
                size_t number_of_messengers_started = 0;

                if (instance->msgr_state == TELEMETRY_MESSENGER_STATE_STOPPED)
                {
                    if (telemetry_messenger_start(instance->messenger_handle, instance->session_handle) != RESULT_OK)
                    {
                        LogError("Device '%s' messenger failed to be started (messenger_start failed)", instance->config->device_id);

                        update_state(instance, DEVICE_STATE_ERROR_MSG);
                    }
                }
                else if (instance->msgr_state == TELEMETRY_MESSENGER_STATE_STARTING)
                {
                    int is_timed_out;
                    if (is_timeout_reached(instance->msgr_state_last_changed_time, instance->msgr_state_change_timeout_secs, &is_timed_out) != RESULT_OK)
                    {
                        LogError("Device '%s' failed verifying the timeout for messenger start (is_timeout_reached failed)", instance->config->device_id);

                        update_state(instance, DEVICE_STATE_ERROR_MSG);
                    }
                    else if (is_timed_out == 1)
                    {
                        LogError("Device '%s' messenger did not complete starting within expected timeout (%lu)", instance->config->device_id, (unsigned long)instance->msgr_state_change_timeout_secs);

                        update_state(instance, DEVICE_STATE_ERROR_MSG);
                    }
                }
                else if (instance->msgr_state == TELEMETRY_MESSENGER_STATE_ERROR)
                {
                    LogError("Device '%s' messenger failed to be started (messenger got into error state)", instance->config->device_id);

                    update_state(instance, DEVICE_STATE_ERROR_MSG);
                }
                else if (instance->msgr_state == TELEMETRY_MESSENGER_STATE_STARTED)
                {
                    number_of_messengers_started++;
                }

                if (instance->twin_msgr_state == TWIN_MESSENGER_STATE_STOPPED)
                {
                    if (twin_messenger_start(instance->twin_messenger_handle, instance->session_handle) != RESULT_OK)
                    {
                        LogError("Device '%s' twin messenger failed to be started (messenger_start failed)", instance->config->device_id);

                        update_state(instance, DEVICE_STATE_ERROR_MSG);
                    }
                }
                else if (instance->twin_msgr_state == TWIN_MESSENGER_STATE_STARTING)
                {
                    int is_timed_out;
                    if (is_timeout_reached(instance->twin_msgr_state_last_changed_time, instance->twin_msgr_state_change_timeout_secs, &is_timed_out) != RESULT_OK)
                    {
                        LogError("Device '%s' failed verifying the timeout for twin messenger start (is_timeout_reached failed)", instance->config->device_id);

                        update_state(instance, DEVICE_STATE_ERROR_MSG);
                    }
                    else if (is_timed_out == 1)
                    {
                        LogError("Device '%s' twin messenger did not complete starting within expected timeout (%lu)", instance->config->device_id, (unsigned long)instance->twin_msgr_state_change_timeout_secs);

                        update_state(instance, DEVICE_STATE_ERROR_MSG);
                    }
                }
                else if (instance->twin_msgr_state == TWIN_MESSENGER_STATE_ERROR)
                {
                    LogError("Device '%s' twin messenger failed to be started (messenger got into error state)", instance->config->device_id);

                    update_state(instance, DEVICE_STATE_ERROR_MSG);
                }
                else if (instance->twin_msgr_state == TWIN_MESSENGER_STATE_STARTED)
                {
                    number_of_messengers_started++;
                }

                if (number_of_messengers_started == 2)
                {
                    update_state(instance, DEVICE_STATE_STARTED);
                }
            }
        }
        else if (instance->state == DEVICE_STATE_STARTED)
        {
            if (instance->config->authentication_mode == DEVICE_AUTH_MODE_CBS &&
                instance->auth_state != AUTHENTICATION_STATE_STARTED)
            {
                LogError("Device '%s' is started but authentication reported unexpected state %d", instance->config->device_id, instance->auth_state);

                if (instance->auth_state != AUTHENTICATION_STATE_ERROR)
                {
                    if (instance->auth_error_code == AUTHENTICATION_ERROR_AUTH_FAILED)
                    {
                        update_state(instance, DEVICE_STATE_ERROR_AUTH);
                    }
                    else // AUTHENTICATION_ERROR_AUTH_TIMEOUT
                    {
                        update_state(instance, DEVICE_STATE_ERROR_AUTH_TIMEOUT);
                    }
                }
                else
                {
                    update_state(instance, DEVICE_STATE_ERROR_AUTH);
                }
            }
            else
            {
                if (instance->msgr_state != TELEMETRY_MESSENGER_STATE_STARTED)
                {
                    LogError("Device '%s' is started but messenger reported unexpected state %d", instance->config->device_id, instance->msgr_state);
                    update_state(instance, DEVICE_STATE_ERROR_MSG);
                }

                if (instance->twin_msgr_state != TWIN_MESSENGER_STATE_STARTED)
                {
                    LogError("Device '%s' is started but TWIN messenger reported unexpected state %d", instance->config->device_id, instance->twin_msgr_state);
                    update_state(instance, DEVICE_STATE_ERROR_MSG);
                }
            }
        }

        // Invoking the do_works():
        if (instance->config->authentication_mode == DEVICE_AUTH_MODE_CBS)
        {
            if (instance->auth_state != AUTHENTICATION_STATE_STOPPED && instance->auth_state != AUTHENTICATION_STATE_ERROR)
            {
                authentication_do_work(instance->authentication_handle);
            }
        }

        if (instance->msgr_state != TELEMETRY_MESSENGER_STATE_STOPPED && instance->msgr_state != TELEMETRY_MESSENGER_STATE_ERROR)
        {
            telemetry_messenger_do_work(instance->messenger_handle);
        }

        if (instance->twin_msgr_state != TWIN_MESSENGER_STATE_STOPPED && instance->twin_msgr_state != TWIN_MESSENGER_STATE_ERROR)
        {
            twin_messenger_do_work(instance->twin_messenger_handle);
        }
    }
}