in src/aws-cpp-sdk-transfer/source/transfer/TransferManager.cpp [968:1082]
void TransferManager::DoDownload(const std::shared_ptr<TransferHandle>& handle)
{
if (!handle->ShouldContinue() || !InitializePartsForDownload(handle))
{
return;
}
handle->UpdateStatus(TransferStatus::IN_PROGRESS);
TriggerTransferStatusUpdatedCallback(handle);
bool isMultipart = handle->IsMultipart();
uint64_t bufferSize = m_transferConfig.bufferSize;
if(!isMultipart)
{
// Special case this for performance (avoid the intermediate buffer write)
DoSinglePartDownload(handle);
return;
}
auto queuedParts = handle->GetQueuedParts();
auto queuedPartIter = queuedParts.begin();
while(queuedPartIter != queuedParts.end() && handle->ShouldContinue())
{
const auto& partState = queuedPartIter->second;
uint64_t rangeStart = handle->GetBytesOffset() + ( partState->GetPartId() - 1 ) * bufferSize;
uint64_t rangeEnd = rangeStart + partState->GetSizeInBytes() - 1;
auto buffer = m_bufferManager.Acquire();
partState->SetDownloadBuffer(buffer);
CreateDownloadStreamCallback responseStreamFunction = [partState, buffer, rangeEnd, rangeStart]()
{
auto bufferStream = Aws::New<Aws::Utils::Stream::DefaultUnderlyingStream>(CLASS_TAG,
Aws::MakeUnique<Aws::Utils::Stream::PreallocatedStreamBuf>(CLASS_TAG, buffer, rangeEnd - rangeStart + 1));
partState->SetDownloadPartStream(bufferStream);
return bufferStream;
};
if(handle->ShouldContinue())
{
partState->SetDownloadBuffer(buffer);
auto getObjectRangeRequest = m_transferConfig.getObjectTemplate;
getObjectRangeRequest.SetCustomizedAccessLogTag(m_transferConfig.customizedAccessLogTag);
getObjectRangeRequest.SetContinueRequestHandler([handle](const Aws::Http::HttpRequest*) { return handle->ShouldContinue(); });
getObjectRangeRequest.SetBucket(handle->GetBucketName());
getObjectRangeRequest.WithKey(handle->GetKey());
getObjectRangeRequest.SetRange(FormatRangeSpecifier(rangeStart, rangeEnd));
getObjectRangeRequest.SetResponseStreamFactory(responseStreamFunction);
getObjectRangeRequest.SetIfMatch(handle->GetEtag());
if(handle->GetVersionId().size() > 0)
{
getObjectRangeRequest.SetVersionId(handle->GetVersionId());
}
if (m_transferConfig.getObjectTemplate.SSECustomerAlgorithmHasBeenSet())
{
getObjectRangeRequest.WithSSECustomerAlgorithm(m_transferConfig.getObjectTemplate.GetSSECustomerAlgorithm())
.WithSSECustomerKey(m_transferConfig.getObjectTemplate.GetSSECustomerKey())
.WithSSECustomerKeyMD5(m_transferConfig.getObjectTemplate.GetSSECustomerKeyMD5());
}
auto self = shared_from_this(); // keep transfer manager alive until all callbacks are finished.
getObjectRangeRequest.SetDataReceivedEventHandler([self, partState, handle](const Aws::Http::HttpRequest*, Aws::Http::HttpResponse*, long long progress)
{
partState->OnDataTransferred(progress, handle);
self->TriggerDownloadProgressCallback(handle);
});
getObjectRangeRequest.SetRequestRetryHandler([self, partState, handle](const Aws::AmazonWebServiceRequest&)
{
partState->Reset();
self->TriggerDownloadProgressCallback(handle);
});
auto asyncContext = Aws::MakeShared<TransferHandleAsyncContext>(CLASS_TAG);
asyncContext->handle = handle;
asyncContext->partState = partState;
auto getObjectTask = Aws::MakeShared<TransferHandle>(CLASS_TAG, handle->GetBucketName(), handle->GetKey()); // fake handle
AddTask(getObjectTask);
auto callback =
[self, getObjectTask](const Aws::S3::S3Client* client,
const Aws::S3::Model::GetObjectRequest& request,
const Aws::S3::Model::GetObjectOutcome& outcome,
const std::shared_ptr<const Aws::Client::AsyncCallerContext>& context)
{
self->HandleGetObjectResponse(client, request, outcome, context);
self->RemoveTask(getObjectTask);
};
handle->AddPendingPart(partState);
m_transferConfig.s3Client->GetObjectAsync(getObjectRangeRequest, callback, asyncContext);
++queuedPartIter;
}
else if(buffer)
{
m_bufferManager.Release(buffer);
break;
}
}
//parts get moved from queued to pending on this thread.
//still consistent.
while(queuedPartIter != queuedParts.end())
{
handle->ChangePartToFailed(queuedPartIter->second);
++queuedPartIter;
}
if (handle->HasFailedParts())
{
handle->UpdateStatus(DetermineIfFailedOrCanceled(*handle));
TriggerTransferStatusUpdatedCallback(handle);
}
}