Gems/HttpRequestor/Code/Source/HttpRequestManager.cpp (174 lines of code) (raw):

/* * Copyright (c) Contributors to the Open 3D Engine Project. * For complete copyright and license terms please see the LICENSE at the root of this distribution. * * SPDX-License-Identifier: Apache-2.0 OR MIT * */ #include <AzFramework/AzFramework_Traits_Platform.h> #include <AzCore/PlatformDef.h> // The AWS Native SDK AWSAllocator triggers a warning due to accessing members of std::allocator directly. // AWSAllocator.h(70): warning C4996: 'std::allocator<T>::pointer': warning STL4010: Various members of std::allocator are deprecated in C++17. // Use std::allocator_traits instead of accessing these members directly. // You can define _SILENCE_CXX17_OLD_ALLOCATOR_MEMBERS_DEPRECATION_WARNING or _SILENCE_ALL_CXX17_DEPRECATION_WARNINGS to acknowledge that you have received this warning. AZ_PUSH_DISABLE_WARNING(4251 4996, "-Wunknown-warning-option") #include <aws/core/http/HttpClient.h> #include <aws/core/http/HttpClientFactory.h> #include <aws/core/http/HttpRequest.h> #include <aws/core/http/HttpResponse.h> #include <aws/core/client/ClientConfiguration.h> AZ_POP_DISABLE_WARNING #include <AWSNativeSDKInit/AWSNativeSDKInitBus.h> #include <AzCore/std/string/conversions.h> #include "HttpRequestManager.h" namespace HttpRequestor { const char* Manager::s_loggingName = "GemHttpRequestManager"; Manager::Manager() { AZStd::thread_desc desc; desc.m_name = s_loggingName; desc.m_cpuId = AFFINITY_MASK_USERTHREADS; m_runThread = true; // Multiple different Gems might try to use the AWSNativeSDK, so make sure it only gets initialized / shutdown once // by the first Gem to try using it. m_ownsAwsNativeInitialization = !AWSNativeSDKInit::AWSNativeSDKInitInterface::Get()->IsInitialized(); if (m_ownsAwsNativeInitialization) { AWSNativeSDKInit::AWSNativeSDKInitInterface::Get()->InitAwsApi(); } auto function = [this] { ThreadFunction(); }; m_thread = AZStd::thread(desc, function); } Manager::~Manager() { m_runThread = false; m_requestConditionVar.notify_all(); if (m_thread.joinable()) { m_thread.join(); } // Shutdown after background thread has closed. if (m_ownsAwsNativeInitialization) { AWSNativeSDKInit::AWSNativeSDKInitInterface::Get()->Shutdown(); } } void Manager::AddRequest(Parameters&& httpRequestParameters) { { AZStd::lock_guard<AZStd::mutex> lock(m_requestMutex); m_requestsToHandle.push(AZStd::move(httpRequestParameters)); } m_requestConditionVar.notify_all(); } void Manager::AddTextRequest(TextParameters&& httpTextRequestParameters) { { AZStd::lock_guard<AZStd::mutex> lock(m_requestMutex); m_textRequestsToHandle.push(AZStd::move(httpTextRequestParameters)); } m_requestConditionVar.notify_all(); } AZStd::chrono::milliseconds Manager::GetLastRoundTripTime() const { return m_lastRoundTripTime.load(AZStd::memory_order_relaxed); } void Manager::ThreadFunction() { // Run the thread as long as directed while (m_runThread) { HandleRequestBatch(); } } void Manager::HandleRequestBatch() { // Lock mutex and wait for work to be signaled via the condition variable AZStd::unique_lock<AZStd::mutex> lock(m_requestMutex); m_requestConditionVar.wait( lock, [&] { return !m_runThread || !m_requestsToHandle.empty() || !m_textRequestsToHandle.empty(); }); // Swap queues AZStd::queue<Parameters> requestsToHandle; requestsToHandle.swap(m_requestsToHandle); AZStd::queue<TextParameters> textRequestsToHandle; textRequestsToHandle.swap(m_textRequestsToHandle); // Release lock lock.unlock(); // Handle requests while (!requestsToHandle.empty()) { HandleRequest(requestsToHandle.front()); requestsToHandle.pop(); } while (!textRequestsToHandle.empty()) { HandleTextRequest(textRequestsToHandle.front()); textRequestsToHandle.pop(); } } void Manager::HandleRequest(const Parameters& httpRequestParameters) { Aws::Client::ClientConfiguration config = httpRequestParameters.GetClientConfiguration(); config.enableTcpKeepAlive = AZ_TRAIT_AZFRAMEWORK_AWS_ENABLE_TCP_KEEP_ALIVE_SUPPORTED; std::shared_ptr<Aws::Http::HttpClient> httpClient = Aws::Http::CreateHttpClient(config); auto httpRequest = Aws::Http::CreateHttpRequest( httpRequestParameters.GetURI(), httpRequestParameters.GetMethod(), Aws::Utils::Stream::DefaultResponseStreamFactoryMethod); AZ_Assert(httpRequest, "HttpRequest not created!"); for (const auto& it : httpRequestParameters.GetHeaders()) { httpRequest->SetHeaderValue(it.first.c_str(), it.second.c_str()); } if (httpRequestParameters.GetBodyStream() != nullptr) { httpRequest->AddContentBody(httpRequestParameters.GetBodyStream()); httpRequest->SetContentLength(AZStd::to_string(httpRequestParameters.GetBodyStream()->str().length()).c_str()); } AZStd::chrono::steady_clock::time_point start = AZStd::chrono::steady_clock::now(); const auto httpResponse = httpClient->MakeRequest(httpRequest); m_lastRoundTripTime.store( AZStd::chrono::duration_cast<AZStd::chrono::milliseconds>(AZStd::chrono::steady_clock::now() - start), AZStd::memory_order_relaxed); if (!httpResponse) { httpRequestParameters.GetCallback()(Aws::Utils::Json::JsonValue(), Aws::Http::HttpResponseCode::INTERNAL_SERVER_ERROR); return; } if (httpResponse->GetResponseCode() != Aws::Http::HttpResponseCode::OK) { httpRequestParameters.GetCallback()(Aws::Utils::Json::JsonValue(), httpResponse->GetResponseCode()); return; } Aws::Utils::Json::JsonValue json(httpResponse->GetResponseBody()); if (json.WasParseSuccessful()) { httpRequestParameters.GetCallback()(AZStd::move(json), httpResponse->GetResponseCode()); } else { httpRequestParameters.GetCallback()(Aws::Utils::Json::JsonValue(), Aws::Http::HttpResponseCode::INTERNAL_SERVER_ERROR); } } void Manager::HandleTextRequest(const TextParameters& httpRequestParameters) { Aws::Client::ClientConfiguration config = httpRequestParameters.GetClientConfiguration(); config.enableTcpKeepAlive = AZ_TRAIT_AZFRAMEWORK_AWS_ENABLE_TCP_KEEP_ALIVE_SUPPORTED; std::shared_ptr<Aws::Http::HttpClient> httpClient = Aws::Http::CreateHttpClient(config); auto httpRequest = Aws::Http::CreateHttpRequest( httpRequestParameters.GetURI(), httpRequestParameters.GetMethod(), Aws::Utils::Stream::DefaultResponseStreamFactoryMethod); for (const auto& it : httpRequestParameters.GetHeaders()) { httpRequest->SetHeaderValue(it.first.c_str(), it.second.c_str()); } if (httpRequestParameters.GetBodyStream() != nullptr) { httpRequest->AddContentBody(httpRequestParameters.GetBodyStream()); } AZStd::chrono::steady_clock::time_point start = AZStd::chrono::steady_clock::now(); const auto httpResponse = httpClient->MakeRequest(httpRequest); m_lastRoundTripTime.store( AZStd::chrono::duration_cast<AZStd::chrono::milliseconds>(AZStd::chrono::steady_clock::now() - start), AZStd::memory_order_relaxed); if (!httpResponse) { httpRequestParameters.GetCallback()(AZStd::string(), Aws::Http::HttpResponseCode::INTERNAL_SERVER_ERROR); return; } if (httpResponse->GetResponseCode() != Aws::Http::HttpResponseCode::OK) { httpRequestParameters.GetCallback()(AZStd::string(), httpResponse->GetResponseCode()); return; } // load up the raw output into a string // TODO(aaj): it feels like there should be some limit maybe 1 MB? std::istreambuf_iterator<char> eos; AZStd::string data(std::istreambuf_iterator<char>(httpResponse->GetResponseBody()), eos); httpRequestParameters.GetCallback()(AZStd::move(data), httpResponse->GetResponseCode()); } } // namespace HttpRequestor