in src/aws-cpp-sdk-core/source/smithy/client/AwsSmithyClientBase.cpp [446:632]
void AwsSmithyClientBase::HandleAsyncReply(std::shared_ptr<AwsSmithyClientAsyncRequestContext> pRequestCtx,
std::shared_ptr<Aws::Http::HttpResponse> httpResponse) const
{
assert(pRequestCtx && httpResponse);
pRequestCtx->m_interceptorContext->SetTransmitResponse(httpResponse);
for (const auto& interceptor : m_interceptors)
{
const auto modifiedResponse = interceptor->ModifyBeforeDeserialization(*pRequestCtx->m_interceptorContext);
if (!modifiedResponse.IsSuccess())
{
return pRequestCtx->m_responseHandler(HttpResponseOutcome(modifiedResponse.GetError()));
}
};
Aws::Client::HttpResponseOutcome outcome = [&]()
{
auto hasEmbeddedError = [&]() {
return pRequestCtx->m_pRequest &&
pRequestCtx->m_pRequest->HasEmbeddedError(httpResponse->GetResponseBody(), httpResponse->GetHeaders());
};
if (Utils::DoesResponseGenerateError(httpResponse))
{
AWS_LOGSTREAM_DEBUG(AWS_SMITHY_CLIENT_LOG, "Request returned error. Attempting to generate appropriate error codes from response");
assert(m_errorMarshaller);
auto error = m_errorMarshaller->BuildAWSError(httpResponse);
return HttpResponseOutcome(std::move(error));
}
else if (hasEmbeddedError()) {
AWS_LOGSTREAM_DEBUG(AWS_SMITHY_CLIENT_LOG, "Response has embedded errors");
auto error = m_errorMarshaller->Marshall(*httpResponse);
return HttpResponseOutcome(std::move(error) );
}
AWS_LOGSTREAM_DEBUG(AWS_SMITHY_CLIENT_LOG, "Request returned successful response.");
return HttpResponseOutcome(std::move(httpResponse));
} ();
Aws::Monitoring::CoreMetricsCollection coreMetrics;
do // goto in a form of "do { break; } while(0);" // TODO: refactor
{
if (pRequestCtx->m_retryCount == 0)
{
m_clientConfig->retryStrategy->RequestBookkeeping(outcome);
}
else
{
assert(pRequestCtx->m_lastError);
m_clientConfig->retryStrategy->RequestBookkeeping(outcome, pRequestCtx->m_lastError.value());
}
coreMetrics.httpClientMetrics = pRequestCtx->m_httpRequest->GetRequestMetrics();
TracingUtils::EmitCoreHttpMetrics(pRequestCtx->m_httpRequest->GetRequestMetrics(),
*m_clientConfig->telemetryProvider->getMeter(this->GetServiceClientName(), {}),
{{TracingUtils::SMITHY_METHOD_DIMENSION, pRequestCtx->m_requestName},
{TracingUtils::SMITHY_SERVICE_DIMENSION, this->GetServiceClientName()}});
if (outcome.IsSuccess())
{
Aws::Monitoring::OnRequestSucceeded(this->GetServiceClientName(),
pRequestCtx->m_requestName,
pRequestCtx->m_httpRequest,
outcome,
coreMetrics,
pRequestCtx->m_monitoringContexts);
AWS_LOGSTREAM_TRACE(AWS_SMITHY_CLIENT_LOG, "Request successful returning.");
break;
}
pRequestCtx->m_lastError = outcome.GetError();
Utils::DateTime serverTime = Utils::GetServerTimeFromError(outcome.GetError());
auto clockSkew = Utils::DateTime::Diff(serverTime, Utils::DateTime::Now());
Aws::Monitoring::OnRequestFailed(this->GetServiceClientName(),
pRequestCtx->m_requestName,
pRequestCtx->m_httpRequest,
outcome,
coreMetrics,
pRequestCtx->m_monitoringContexts);
if (!m_httpClient->IsRequestProcessingEnabled())
{
AWS_LOGSTREAM_TRACE(AWS_SMITHY_CLIENT_LOG, "Request was cancelled externally.");
break;
}
// Adjust region
// TODO: extract into common func
bool retryWithCorrectRegion = [&]() {
using HttpResponseCode = Aws::Http::HttpResponseCode;
const HttpResponseCode httpResponseCode = outcome.GetError().GetResponseCode();
if (httpResponseCode == HttpResponseCode::MOVED_PERMANENTLY || // 301
httpResponseCode == HttpResponseCode::TEMPORARY_REDIRECT || // 307
httpResponseCode == HttpResponseCode::BAD_REQUEST || // 400
httpResponseCode == HttpResponseCode::FORBIDDEN) // 403
{
assert(m_errorMarshaller);
const Aws::String regionFromResponse = m_errorMarshaller->ExtractRegion(outcome.GetError());
const Aws::String signerRegion = [&]() {
if (!regionFromResponse.empty() &&
pRequestCtx->m_endpoint.GetAttributes() &&
pRequestCtx->m_endpoint.GetAttributes()->authScheme.GetSigningRegion())
{
return pRequestCtx->m_endpoint.GetAttributes()->authScheme.GetSigningRegion().value();
}
return Aws::String("");
} ();
if (m_clientConfig->region == Aws::Region::AWS_GLOBAL && !regionFromResponse.empty() &&
regionFromResponse != signerRegion) {
pRequestCtx->m_endpoint.AccessAttributes()->authScheme.SetSigningRegion(regionFromResponse);
AWS_LOGSTREAM_DEBUG(AWS_SMITHY_CLIENT_LOG, "Need to retry with a correct region");
return true;
}
}
return false;
} (); // <- immediately invoked lambda
long sleepMillis = TracingUtils::MakeCallWithTiming<long>(
[&]() -> long {
return m_clientConfig->retryStrategy->CalculateDelayBeforeNextRetry(outcome.GetError(), static_cast<long>(pRequestCtx->m_retryCount));
},
TracingUtils::SMITHY_CLIENT_SERVICE_BACKOFF_DELAY_METRIC,
*m_clientConfig->telemetryProvider->getMeter(this->GetServiceClientName(), {}),
{{TracingUtils::SMITHY_METHOD_DIMENSION, pRequestCtx->m_requestName},
{TracingUtils::SMITHY_SERVICE_DIMENSION, this->GetServiceClientName()}});
bool shouldSleep = !retryWithCorrectRegion;
if (m_clientConfig->enableClockSkewAdjustment)
{
// AdjustClockSkew returns true means clock skew was the problem and skew was adjusted, false otherwise.
// sleep if clock skew and region was NOT the problem. AdjustClockSkew may update error inside outcome.
shouldSleep |= !this->AdjustClockSkew(outcome, pRequestCtx->m_authSchemeOption);
}
if (!retryWithCorrectRegion && !m_clientConfig->retryStrategy->ShouldRetry(outcome.GetError(), static_cast<long>(pRequestCtx->m_retryCount)))
{
break;
}
AWS_LOGSTREAM_WARN(AWS_SMITHY_CLIENT_LOG, "Request failed, now waiting " << sleepMillis << " ms before attempting again.");
if(pRequestCtx->m_pRequest) {
if (pRequestCtx->m_pRequest->GetBody()) {
pRequestCtx->m_pRequest->GetBody()->clear();
pRequestCtx->m_pRequest->GetBody()->seekg(0);
}
if (pRequestCtx->m_pRequest->GetRequestRetryHandler()) {
pRequestCtx->m_pRequest->GetRequestRetryHandler()(*pRequestCtx->m_pRequest);
}
}
if (shouldSleep)
{
m_httpClient->RetryRequestSleep(std::chrono::milliseconds(sleepMillis));
}
if (retryWithCorrectRegion)
{
Aws::String newEndpoint = m_errorMarshaller->ExtractEndpoint(outcome.GetError());
if (!newEndpoint.empty()) {
Aws::Http::URI newUri = pRequestCtx->m_endpoint.GetURI();
newUri.SetAuthority(newEndpoint);
pRequestCtx->m_endpoint.SetURI(newUri);
AWS_LOGSTREAM_DEBUG(AWS_SMITHY_CLIENT_LOG, "Endpoint has been updated to " << newUri.GetURIString());
}
}
if (serverTime.WasParseSuccessful() && serverTime != Utils::DateTime())
{
pRequestCtx->m_requestInfo.ttl = Utils::DateTime::Now() + clockSkew + std::chrono::milliseconds(m_clientConfig->requestTimeoutMs);
}
pRequestCtx->m_requestInfo.attempt ++;
pRequestCtx->m_requestInfo.maxAttempts = m_clientConfig->retryStrategy->GetMaxAttempts();
Aws::Monitoring::OnRequestRetry(this->GetServiceClientName(), pRequestCtx->m_requestName, pRequestCtx->m_httpRequest, pRequestCtx->m_monitoringContexts);
pRequestCtx->m_retryCount++;
AttemptOneRequestAsync(std::move(pRequestCtx));
return;
} while(false); // end of goto in a form of "do { break; } while(false);"
auto meter = m_clientConfig->telemetryProvider->getMeter(this->GetServiceClientName(), {});
auto counter = meter->CreateCounter(TracingUtils::SMITHY_CLIENT_SERVICE_ATTEMPTS_METRIC, TracingUtils::COUNT_METRIC_TYPE, "");
counter->add(pRequestCtx->m_requestInfo.attempt, {{TracingUtils::SMITHY_METHOD_DIMENSION, pRequestCtx->m_requestName},
{TracingUtils::SMITHY_SERVICE_DIMENSION, this->GetServiceClientName()}});
Aws::Monitoring::OnFinish(this->GetServiceClientName(), pRequestCtx->m_requestName, pRequestCtx->m_httpRequest, pRequestCtx->m_monitoringContexts);
return pRequestCtx->m_responseHandler(std::move(outcome));
}