void TransferManager::DoMultiPartUpload()

in src/aws-cpp-sdk-transfer/source/transfer/TransferManager.cpp [395:528]


        void TransferManager::DoMultiPartUpload(const std::shared_ptr<Aws::IOStream>& streamToPut, const std::shared_ptr<TransferHandle>& handle)
        {
            handle->SetIsMultipart(true);

            bool isRetry = !handle->GetMultiPartId().empty();
            uint64_t sentBytes = 0;

            if (!isRetry) {
              Aws::S3::Model::CreateMultipartUploadRequest createMultipartRequest = m_transferConfig.createMultipartUploadTemplate;
              createMultipartRequest.SetChecksumAlgorithm(m_transferConfig.checksumAlgorithm);
              createMultipartRequest.SetCustomizedAccessLogTag(m_transferConfig.customizedAccessLogTag);
              createMultipartRequest.SetBucket(handle->GetBucketName());
              createMultipartRequest.SetContentType(handle->GetContentType());
              createMultipartRequest.SetKey(handle->GetKey());
              createMultipartRequest.SetMetadata(handle->GetMetadata());

              auto createMultipartResponse = m_transferConfig.s3Client->CreateMultipartUpload(createMultipartRequest);
              if (createMultipartResponse.IsSuccess()) {
                handle->SetMultipartId(createMultipartResponse.GetResult().GetUploadId());
                uint64_t totalSize = handle->GetBytesTotalSize();
                uint64_t partCount = (totalSize + m_transferConfig.bufferSize - 1) / m_transferConfig.bufferSize;
                AWS_LOGSTREAM_DEBUG(CLASS_TAG, "Transfer handle [" << handle->GetId()
                                                                   << "] Successfully created a multi-part upload request. Upload ID: ["
                                                                   << createMultipartResponse.GetResult().GetUploadId()
                                                                   << "]. Splitting the multi-part upload to " << partCount << " part(s).");

                for (uint64_t i = 0; i < partCount; ++i) {
                  uint64_t partSize = (std::min)(totalSize - i * m_transferConfig.bufferSize, m_transferConfig.bufferSize);
                  bool lastPart = (i == partCount - 1) ? true : false;
                  handle->AddQueuedPart(Aws::MakeShared<PartState>(CLASS_TAG, static_cast<int>(i + 1), 0, partSize, lastPart));
                }
                }
                else
                {
                    AWS_LOGSTREAM_ERROR(CLASS_TAG, "Transfer handle [" << handle->GetId() << "] Failed to create a "
                            "multi-part upload request. Bucket: [" << handle->GetBucketName()
                            << "] with Key: [" << handle->GetKey() << "]. " << createMultipartResponse.GetError());
                    handle->SetError(createMultipartResponse.GetError());
                    handle->UpdateStatus(DetermineIfFailedOrCanceled(*handle));

                    TriggerErrorCallback(handle, createMultipartResponse.GetError());
                    TriggerTransferStatusUpdatedCallback(handle);
                    return;
                }
            } else {
              uint64_t bytesLeft = 0;
              // at this point we've been going synchronously so this is consistent
              const auto failedPartsSize = handle->GetFailedParts().size();
              for (auto failedParts : handle->GetFailedParts()) {
                bytesLeft += failedParts.second->GetSizeInBytes();
                handle->AddQueuedPart(failedParts.second);
              }

              sentBytes = handle->GetBytesTotalSize() - bytesLeft;

              AWS_LOGSTREAM_DEBUG(CLASS_TAG, "Transfer handle [" << handle->GetId() << "] Retrying multi-part upload for "
                                                                 << failedPartsSize << " failed parts of total size " << bytesLeft
                                                                 << " bytes. Upload ID [" << handle->GetMultiPartId() << "].");
            }

            //still consistent
            PartStateMap queuedParts = handle->GetQueuedParts();
            auto partsIter = queuedParts.begin();

            handle->UpdateStatus(TransferStatus::IN_PROGRESS);
            TriggerTransferStatusUpdatedCallback(handle);

            while (sentBytes < handle->GetBytesTotalSize() && handle->ShouldContinue() && partsIter != queuedParts.end()) {
              auto buffer = m_bufferManager.Acquire();
              if (handle->ShouldContinue()) {
                auto lengthToWrite = partsIter->second->GetSizeInBytes();
                streamToPut->seekg((partsIter->first - 1) * m_transferConfig.bufferSize);
                streamToPut->read(reinterpret_cast<char*>(buffer), lengthToWrite);

                auto streamBuf = Aws::New<Aws::Utils::Stream::PreallocatedStreamBuf>(CLASS_TAG, buffer, static_cast<size_t>(lengthToWrite));
                auto preallocatedStreamReader = Aws::MakeShared<Aws::IOStream>(CLASS_TAG, streamBuf);

                auto self = shared_from_this();  // keep transfer manager alive until all callbacks are finished.
                PartPointer partPtr = partsIter->second;
                Aws::S3::Model::UploadPartRequest uploadPartRequest = m_transferConfig.uploadPartTemplate;
                uploadPartRequest.SetChecksumAlgorithm(m_transferConfig.checksumAlgorithm);
                uploadPartRequest.SetCustomizedAccessLogTag(m_transferConfig.customizedAccessLogTag);
                uploadPartRequest.SetBucket(handle->GetBucketName());
                uploadPartRequest.SetContentLength(static_cast<long long>(lengthToWrite));
                uploadPartRequest.SetKey(handle->GetKey());
                uploadPartRequest.SetPartNumber(partsIter->first);
                uploadPartRequest.SetUploadId(handle->GetMultiPartId());

                uploadPartRequest.SetContinueRequestHandler([handle](const Aws::Http::HttpRequest*) { return handle->ShouldContinue(); });
                uploadPartRequest.SetDataSentEventHandler([self, handle, partPtr](const Aws::Http::HttpRequest*, long long amount) {
                  partPtr->OnDataTransferred(amount, handle);
                  self->TriggerUploadProgressCallback(handle);
                });
                uploadPartRequest.SetRequestRetryHandler([partPtr](const AmazonWebServiceRequest&) { partPtr->Reset(); });

                handle->AddPendingPart(partsIter->second);

                uploadPartRequest.SetBody(preallocatedStreamReader);
                uploadPartRequest.SetContentType(handle->GetContentType());

                auto asyncContext = Aws::MakeShared<TransferHandleAsyncContext>(CLASS_TAG);
                asyncContext->handle = handle;
                asyncContext->partState = partsIter->second;

                auto uploadTask = Aws::MakeShared<TransferHandle>(CLASS_TAG, handle->GetBucketName(), handle->GetKey());  // fake handle
                AddTask(uploadTask);
                auto callback = [self, uploadTask](const Aws::S3::S3Client* client, const Aws::S3::Model::UploadPartRequest& request,
                                                   const Aws::S3::Model::UploadPartOutcome& outcome,
                                                   const std::shared_ptr<const Aws::Client::AsyncCallerContext>& context) {
                  self->HandleUploadPartResponse(client, request, outcome, context);
                  self->RemoveTask(uploadTask);
                };

                m_transferConfig.s3Client->UploadPartAsync(uploadPartRequest, callback, asyncContext);
                sentBytes += lengthToWrite;

                ++partsIter;
              } else {
                m_bufferManager.Release(buffer);
              }
            }
            //parts get moved from queued to pending on this thread.
            //still consistent.
            for (; partsIter != queuedParts.end(); ++partsIter)
            {
                handle->ChangePartToFailed(partsIter->second);
            }

            if (handle->HasFailedParts())
            {
                handle->UpdateStatus(DetermineIfFailedOrCanceled(*handle));
                TriggerTransferStatusUpdatedCallback(handle);
            }
        }