in src/ClientCoreState.cpp [131:186]
void ClientCoreState::ProcessOutboundActionQueue(std::shared_ptr<std::atomic_bool> thread_task_out_sync) {
ResponseCode rc = ResponseCode::SUCCESS;
int action_execution_delay = 1000 / MAX_CORE_ACTION_PROCESSING_RATE_HZ;
std::atomic_bool &_thread_task_out_sync = *thread_task_out_sync;
do {
// Reset ResponseCode state
rc = ResponseCode::SUCCESS;
if (/*!process_queued_actions_ || */outbound_action_queue_.empty()) {
std::this_thread::sleep_for(std::chrono::milliseconds(DEFAULT_CORE_THREAD_SLEEP_DURATION_MS));
continue;
}
std::lock_guard<std::mutex> sync_action_lock(sync_action_request_lock_);
auto next = std::chrono::system_clock::now() + std::chrono::milliseconds(action_execution_delay);
ActionType action_type = outbound_action_queue_.front().first;
std::shared_ptr<ActionData> p_action_data = outbound_action_queue_.front().second;
outbound_action_queue_.pop();
util::Map<ActionType, std::unique_ptr<Action>>::const_iterator itr = action_map_.find(action_type);
ActionData::AsyncAckNotificationHandlerPtr p_async_ack_handler = p_action_data->p_async_ack_handler_;
if (itr != action_map_.end()) {
if (nullptr != p_async_ack_handler) {
// Add Ack before sending request. Read request runs in separate thread and may receive response
// before ack is added, if we add it after sending the request.
rc = RegisterPendingAck(p_action_data->GetActionId(), p_async_ack_handler);
if (ResponseCode::SUCCESS != rc) {
p_async_ack_handler(p_action_data->GetActionId(), rc);
AWS_LOG_ERROR(LOG_TAG_CLIENT_CORE_STATE,
"Registering Ack Handler for Outbound Queued Action failed. %s",
ResponseHelper::ToString(rc).c_str());
}
}
// rc will be ResponseCode::SUCCESS by default at this point if no Ack handler was provided
if (ResponseCode::SUCCESS == rc) {
rc = itr->second->PerformAction(p_network_connection_, p_action_data);
if (ResponseCode::SUCCESS != rc) {
if (nullptr != p_async_ack_handler) {
// Delete waiting for Ack for Failed Actions
DeletePendingAck(p_action_data->GetActionId());
p_async_ack_handler(p_action_data->GetActionId(), rc);
}
AWS_LOG_ERROR(LOG_TAG_CLIENT_CORE_STATE,
"Performing Outbound Queued Action failed. %s",
ResponseHelper::ToString(rc).c_str());
}
}
} else {
rc = ResponseCode::ACTION_NOT_REGISTERED_ERROR;
AWS_LOG_ERROR(LOG_TAG_CLIENT_CORE_STATE,
"Performing Outbound Queued Action failed. %s",
ResponseHelper::ToString(rc).c_str());
}
// This is not perfect since we have no control over how long an action takes.
// But it will definitely ensure that we don't exceed the max rate
std::this_thread::sleep_until(next);
} while (_thread_task_out_sync);
}