void ClientCoreState::ProcessOutboundActionQueue()

in src/ClientCoreState.cpp [131:186]


    void ClientCoreState::ProcessOutboundActionQueue(std::shared_ptr<std::atomic_bool> thread_task_out_sync) {
        ResponseCode rc = ResponseCode::SUCCESS;
        int action_execution_delay = 1000 / MAX_CORE_ACTION_PROCESSING_RATE_HZ;
        std::atomic_bool &_thread_task_out_sync = *thread_task_out_sync;
        do {
            // Reset ResponseCode state
            rc = ResponseCode::SUCCESS;
            if (/*!process_queued_actions_ || */outbound_action_queue_.empty()) {
                std::this_thread::sleep_for(std::chrono::milliseconds(DEFAULT_CORE_THREAD_SLEEP_DURATION_MS));
                continue;
            }
            std::lock_guard<std::mutex> sync_action_lock(sync_action_request_lock_);
            auto next = std::chrono::system_clock::now() + std::chrono::milliseconds(action_execution_delay);
            ActionType action_type = outbound_action_queue_.front().first;
            std::shared_ptr<ActionData> p_action_data = outbound_action_queue_.front().second;

            outbound_action_queue_.pop();
            util::Map<ActionType, std::unique_ptr<Action>>::const_iterator itr = action_map_.find(action_type);
            ActionData::AsyncAckNotificationHandlerPtr p_async_ack_handler = p_action_data->p_async_ack_handler_;
            if (itr != action_map_.end()) {
                if (nullptr != p_async_ack_handler) {
                    // Add Ack before sending request. Read request runs in separate thread and may receive response
                    // before ack is added, if we add it after sending the request.
                    rc = RegisterPendingAck(p_action_data->GetActionId(), p_async_ack_handler);
                    if (ResponseCode::SUCCESS != rc) {
                        p_async_ack_handler(p_action_data->GetActionId(), rc);
                        AWS_LOG_ERROR(LOG_TAG_CLIENT_CORE_STATE,
                                      "Registering Ack Handler for Outbound Queued Action failed. %s",
                                      ResponseHelper::ToString(rc).c_str());
                    }
                }
                // rc will be ResponseCode::SUCCESS by default at this point if no Ack handler was provided
                if (ResponseCode::SUCCESS == rc) {
                    rc = itr->second->PerformAction(p_network_connection_, p_action_data);
                    if (ResponseCode::SUCCESS != rc) {
                        if (nullptr != p_async_ack_handler) {
                            // Delete waiting for Ack for Failed Actions
                            DeletePendingAck(p_action_data->GetActionId());
                            p_async_ack_handler(p_action_data->GetActionId(), rc);
                        }
                        AWS_LOG_ERROR(LOG_TAG_CLIENT_CORE_STATE,
                                      "Performing Outbound Queued Action failed. %s",
                                      ResponseHelper::ToString(rc).c_str());
                    }
                }
            } else {
                rc = ResponseCode::ACTION_NOT_REGISTERED_ERROR;
                AWS_LOG_ERROR(LOG_TAG_CLIENT_CORE_STATE,
                              "Performing Outbound Queued Action failed. %s",
                              ResponseHelper::ToString(rc).c_str());
            }
            // This is not perfect since we have no control over how long an action takes.
            // But it will definitely ensure that we don't exceed the max rate
            std::this_thread::sleep_until(next);
        } while (_thread_task_out_sync);
    }