in src/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp [360:476]
std::shared_ptr<HttpResponse> CRTHttpClient::MakeRequest(const std::shared_ptr<HttpRequest>& request,
Aws::Utils::RateLimits::RateLimiterInterface*,
Aws::Utils::RateLimits::RateLimiterInterface*) const
{
auto crtRequest = Crt::MakeShared<Crt::Http::HttpRequest>(Crt::g_allocator);
auto response = Aws::MakeShared<Standard::StandardHttpResponse>(CRT_HTTP_CLIENT_TAG, request);
auto requestConnOptions = CreateConnectionOptionsForRequest(request);
auto connectionManager = GetWithCreateConnectionManagerForRequest(request, requestConnOptions);
if (!connectionManager)
{
response->SetClientErrorMessage(aws_error_debug_str(aws_last_error()));
response->SetClientErrorType(Client::CoreErrors::INVALID_PARAMETER_COMBINATION);
return response;
}
AddRequestMetadataToCrtRequest(request, crtRequest);
// Set the request body stream on the crt request. Setup the write rate limiter if present
if (request->GetContentBody())
{
bool isStreaming = request->IsEventStreamRequest();
if (request->HasHeader(Aws::Http::CONTENT_ENCODING_HEADER) && request->GetHeaderValue(Aws::Http::CONTENT_ENCODING_HEADER) == Aws::Http::AWS_CHUNKED_VALUE) {
crtRequest->SetBody(Aws::MakeShared<Aws::Utils::Stream::AwsChunkedStream<>>(CRT_HTTP_CLIENT_TAG, request.get(), request->GetContentBody()));
} else {
crtRequest->SetBody(Aws::MakeShared<SDKAdaptingInputStream>(CRT_HTTP_CLIENT_TAG, m_configuration.writeRateLimiter, request->GetContentBody(), *this, *request, isStreaming));
}
}
Crt::Http::HttpRequestOptions requestOptions;
requestOptions.request = crtRequest.get();
requestOptions.onIncomingBody =
[this, request, response](Crt::Http::HttpStream& stream, const Crt::ByteCursor& body)
{
OnResponseBodyReceived(stream, body, response, request, *this);
};
requestOptions.onIncomingHeaders =
[response](Crt::Http::HttpStream& stream, enum aws_http_header_block block, const Crt::Http::HttpHeader* headersArray, std::size_t headersCount)
{
OnIncomingHeaders(stream, block, headersArray, headersCount, response);
};
// This will arrive at or around the same time as the headers. Use it to set the response code on the response
requestOptions.onIncomingHeadersBlockDone =
[request, response](Crt::Http::HttpStream& stream, enum aws_http_header_block block)
{
OnIncomingHeadersBlockDone(stream, block, response);
auto& headersHandler = request->GetHeadersReceivedEventHandler();
if (headersHandler)
{
headersHandler(request.get(), response.get());
}
};
// CRT client is async only so we'll need to do the synchronous part ourselves.
// We'll use a condition variable and wait on it until the request completes or errors out.
AsyncWaiter waiter;
requestOptions.onStreamComplete =
[&waiter, &response](Crt::Http::HttpStream& stream, int errorCode)
{
OnStreamComplete(stream, errorCode, waiter, response);
};
std::shared_ptr<Crt::Http::HttpClientConnection> connectionRef(nullptr);
// now we finally have the request, get a connection and make the request.
connectionManager->AcquireConnection(
[&connectionRef, &requestOptions, response, &waiter, request, this]
(std::shared_ptr<Crt::Http::HttpClientConnection> connection, int errorCode)
{
OnClientConnectionAvailable(connection, errorCode, connectionRef, requestOptions, waiter, request, response, *this);
});
bool waiterTimedOut = false;
// Naive http request timeout implementation. This doesn't factor in how long it took to get the connection from the pool, and
// I'm undecided on the queueing theory implications of this decision so if this turns out to be the wrong granularity
// this is the section of code you should be changing. You can probably get "close" by having an additional
// atomic (not necessarily full on atomics implementation, but it needs to be the size of a WORD if it's not)
// counter that gets incremented in the acquireConnection callback as long as your connection timeout
// is shorter than your request timeout. Even if it's not, that would handle like.... 4-5 nines of getting this right.
// since in the worst case scenario, your connect timeout got preempted by the request timeout, and is it really worth
// all that effort if that's the worst thing that can happen?
if (m_configuration.requestTimeoutMs > 0 )
{
waiterTimedOut = !waiter.WaitOnCompletionFor(m_configuration.requestTimeoutMs);
// if this is true, the waiter timed out without a terminal condition being woken up.
if (waiterTimedOut)
{
// close the connection if it's still there so we can expedite anything we're waiting on.
if (connectionRef)
{
connectionRef->Close();
}
}
}
// always wait, even if the above section timed out, because Wakeup() hasn't yet been called,
// and this means we're still waiting on some queued up callbacks to fire.
// going past this point before that occurs will cause a segfault when the callback DOES finally fire
// since the waiter is on the stack.
waiter.WaitOnCompletion();
// now handle if we timed out or not.
if (waiterTimedOut)
{
response->SetClientErrorType(
Aws::Client::CoreErrors::REQUEST_TIMEOUT);
response->SetClientErrorMessage("Request Timeout Has Expired");
}
// TODO: is VOX support still a thing? If so we need to add the metrics for it.
return response;
}