in src/aws-cpp-sdk-transfer/source/transfer/TransferManager.cpp [395:528]
void TransferManager::DoMultiPartUpload(const std::shared_ptr<Aws::IOStream>& streamToPut, const std::shared_ptr<TransferHandle>& handle)
{
handle->SetIsMultipart(true);
bool isRetry = !handle->GetMultiPartId().empty();
uint64_t sentBytes = 0;
if (!isRetry) {
Aws::S3::Model::CreateMultipartUploadRequest createMultipartRequest = m_transferConfig.createMultipartUploadTemplate;
createMultipartRequest.SetChecksumAlgorithm(m_transferConfig.checksumAlgorithm);
createMultipartRequest.SetCustomizedAccessLogTag(m_transferConfig.customizedAccessLogTag);
createMultipartRequest.SetBucket(handle->GetBucketName());
createMultipartRequest.SetContentType(handle->GetContentType());
createMultipartRequest.SetKey(handle->GetKey());
createMultipartRequest.SetMetadata(handle->GetMetadata());
auto createMultipartResponse = m_transferConfig.s3Client->CreateMultipartUpload(createMultipartRequest);
if (createMultipartResponse.IsSuccess()) {
handle->SetMultipartId(createMultipartResponse.GetResult().GetUploadId());
uint64_t totalSize = handle->GetBytesTotalSize();
uint64_t partCount = (totalSize + m_transferConfig.bufferSize - 1) / m_transferConfig.bufferSize;
AWS_LOGSTREAM_DEBUG(CLASS_TAG, "Transfer handle [" << handle->GetId()
<< "] Successfully created a multi-part upload request. Upload ID: ["
<< createMultipartResponse.GetResult().GetUploadId()
<< "]. Splitting the multi-part upload to " << partCount << " part(s).");
for (uint64_t i = 0; i < partCount; ++i) {
uint64_t partSize = (std::min)(totalSize - i * m_transferConfig.bufferSize, m_transferConfig.bufferSize);
bool lastPart = (i == partCount - 1) ? true : false;
handle->AddQueuedPart(Aws::MakeShared<PartState>(CLASS_TAG, static_cast<int>(i + 1), 0, partSize, lastPart));
}
}
else
{
AWS_LOGSTREAM_ERROR(CLASS_TAG, "Transfer handle [" << handle->GetId() << "] Failed to create a "
"multi-part upload request. Bucket: [" << handle->GetBucketName()
<< "] with Key: [" << handle->GetKey() << "]. " << createMultipartResponse.GetError());
handle->SetError(createMultipartResponse.GetError());
handle->UpdateStatus(DetermineIfFailedOrCanceled(*handle));
TriggerErrorCallback(handle, createMultipartResponse.GetError());
TriggerTransferStatusUpdatedCallback(handle);
return;
}
} else {
uint64_t bytesLeft = 0;
// at this point we've been going synchronously so this is consistent
const auto failedPartsSize = handle->GetFailedParts().size();
for (auto failedParts : handle->GetFailedParts()) {
bytesLeft += failedParts.second->GetSizeInBytes();
handle->AddQueuedPart(failedParts.second);
}
sentBytes = handle->GetBytesTotalSize() - bytesLeft;
AWS_LOGSTREAM_DEBUG(CLASS_TAG, "Transfer handle [" << handle->GetId() << "] Retrying multi-part upload for "
<< failedPartsSize << " failed parts of total size " << bytesLeft
<< " bytes. Upload ID [" << handle->GetMultiPartId() << "].");
}
//still consistent
PartStateMap queuedParts = handle->GetQueuedParts();
auto partsIter = queuedParts.begin();
handle->UpdateStatus(TransferStatus::IN_PROGRESS);
TriggerTransferStatusUpdatedCallback(handle);
while (sentBytes < handle->GetBytesTotalSize() && handle->ShouldContinue() && partsIter != queuedParts.end()) {
auto buffer = m_bufferManager.Acquire();
if (handle->ShouldContinue()) {
auto lengthToWrite = partsIter->second->GetSizeInBytes();
streamToPut->seekg((partsIter->first - 1) * m_transferConfig.bufferSize);
streamToPut->read(reinterpret_cast<char*>(buffer), lengthToWrite);
auto streamBuf = Aws::New<Aws::Utils::Stream::PreallocatedStreamBuf>(CLASS_TAG, buffer, static_cast<size_t>(lengthToWrite));
auto preallocatedStreamReader = Aws::MakeShared<Aws::IOStream>(CLASS_TAG, streamBuf);
auto self = shared_from_this(); // keep transfer manager alive until all callbacks are finished.
PartPointer partPtr = partsIter->second;
Aws::S3::Model::UploadPartRequest uploadPartRequest = m_transferConfig.uploadPartTemplate;
uploadPartRequest.SetChecksumAlgorithm(m_transferConfig.checksumAlgorithm);
uploadPartRequest.SetCustomizedAccessLogTag(m_transferConfig.customizedAccessLogTag);
uploadPartRequest.SetBucket(handle->GetBucketName());
uploadPartRequest.SetContentLength(static_cast<long long>(lengthToWrite));
uploadPartRequest.SetKey(handle->GetKey());
uploadPartRequest.SetPartNumber(partsIter->first);
uploadPartRequest.SetUploadId(handle->GetMultiPartId());
uploadPartRequest.SetContinueRequestHandler([handle](const Aws::Http::HttpRequest*) { return handle->ShouldContinue(); });
uploadPartRequest.SetDataSentEventHandler([self, handle, partPtr](const Aws::Http::HttpRequest*, long long amount) {
partPtr->OnDataTransferred(amount, handle);
self->TriggerUploadProgressCallback(handle);
});
uploadPartRequest.SetRequestRetryHandler([partPtr](const AmazonWebServiceRequest&) { partPtr->Reset(); });
handle->AddPendingPart(partsIter->second);
uploadPartRequest.SetBody(preallocatedStreamReader);
uploadPartRequest.SetContentType(handle->GetContentType());
auto asyncContext = Aws::MakeShared<TransferHandleAsyncContext>(CLASS_TAG);
asyncContext->handle = handle;
asyncContext->partState = partsIter->second;
auto uploadTask = Aws::MakeShared<TransferHandle>(CLASS_TAG, handle->GetBucketName(), handle->GetKey()); // fake handle
AddTask(uploadTask);
auto callback = [self, uploadTask](const Aws::S3::S3Client* client, const Aws::S3::Model::UploadPartRequest& request,
const Aws::S3::Model::UploadPartOutcome& outcome,
const std::shared_ptr<const Aws::Client::AsyncCallerContext>& context) {
self->HandleUploadPartResponse(client, request, outcome, context);
self->RemoveTask(uploadTask);
};
m_transferConfig.s3Client->UploadPartAsync(uploadPartRequest, callback, asyncContext);
sentBytes += lengthToWrite;
++partsIter;
} else {
m_bufferManager.Release(buffer);
}
}
//parts get moved from queued to pending on this thread.
//still consistent.
for (; partsIter != queuedParts.end(); ++partsIter)
{
handle->ChangePartToFailed(partsIter->second);
}
if (handle->HasFailedParts())
{
handle->UpdateStatus(DetermineIfFailedOrCanceled(*handle));
TriggerTransferStatusUpdatedCallback(handle);
}
}