in src/aws-cpp-sdk-transfer/source/transfer/TransferManager.cpp [615:753]
void TransferManager::HandleUploadPartResponse(const Aws::S3::S3Client*, const Aws::S3::Model::UploadPartRequest& request,
const Aws::S3::Model::UploadPartOutcome& outcome, const std::shared_ptr<const Aws::Client::AsyncCallerContext>& context)
{
std::shared_ptr<TransferHandleAsyncContext> transferContext =
std::const_pointer_cast<TransferHandleAsyncContext>(std::static_pointer_cast<const TransferHandleAsyncContext>(context));
auto originalStreamBuffer = (Aws::Utils::Stream::PreallocatedStreamBuf*)request.GetBody()->rdbuf();
m_bufferManager.Release(originalStreamBuffer->GetBuffer());
Aws::Delete(originalStreamBuffer);
const auto& handle = transferContext->handle;
const auto& partState = transferContext->partState;
if (outcome.IsSuccess())
{
if (handle->ShouldContinue())
{
partState->SetChecksum([&]() -> Aws::String {
if (m_transferConfig.checksumAlgorithm == S3::Model::ChecksumAlgorithm::CRC32)
{
return outcome.GetResult().GetChecksumCRC32();
}
else if (m_transferConfig.checksumAlgorithm == S3::Model::ChecksumAlgorithm::CRC32C)
{
return outcome.GetResult().GetChecksumCRC32C();
}
else if (m_transferConfig.checksumAlgorithm == S3::Model::ChecksumAlgorithm::SHA1)
{
return outcome.GetResult().GetChecksumSHA1();
}
else if (m_transferConfig.checksumAlgorithm == S3::Model::ChecksumAlgorithm::SHA256)
{
return outcome.GetResult().GetChecksumSHA256();
}
//Return empty checksum for not set.
return "";
}());
handle->ChangePartToCompleted(partState, outcome.GetResult().GetETag());
AWS_LOGSTREAM_DEBUG(CLASS_TAG, "Transfer handle [" << handle->GetId()
<< " successfully uploaded Part: [" << partState->GetPartId() << "] to Bucket: ["
<< handle->GetBucketName() << "] with Key: [" << handle->GetKey() << "] with Upload ID: ["
<< handle->GetMultiPartId() << "].");
TriggerUploadProgressCallback(handle);
}
else
{
// marking this as failed so we eventually update the handle's status to CANCELED.
// Updating the handle's status to CANCELED here might result in a race-condition between a
// potentially restarting upload and a latent successful part.
handle->ChangePartToFailed(partState);
AWS_LOGSTREAM_WARN(CLASS_TAG, "Transfer handle [" << handle->GetId()
<< " successfully uploaded Part: [" << partState->GetPartId() << "] to Bucket: ["
<< handle->GetBucketName() << "] with Key: [" << handle->GetKey() << "] with Upload ID: ["
<< handle->GetMultiPartId() << "] but transfer has been cancelled meanwhile.");
}
}
else
{
AWS_LOGSTREAM_ERROR(CLASS_TAG, "Transfer handle [" << handle->GetId() << "] Failed to upload part ["
<< partState->GetPartId() << "] to Bucket: [" << handle->GetBucketName()
<< "] with Key: [" << handle->GetKey() << "] with Upload ID: [" << handle->GetMultiPartId()
<< "]. " << outcome.GetError());
handle->ChangePartToFailed(partState);
handle->SetError(outcome.GetError());
TriggerErrorCallback(handle, outcome.GetError());
}
TriggerTransferStatusUpdatedCallback(handle);
PartStateMap pendingParts, queuedParts, failedParts, completedParts;
handle->GetAllPartsTransactional(queuedParts, pendingParts, failedParts, completedParts);
if (pendingParts.size() == 0 && queuedParts.size() == 0 && handle->LockForCompletion())
{
if (failedParts.size() == 0 && (handle->GetBytesTransferred() >= handle->GetBytesTotalSize()))
{
Aws::S3::Model::CompletedMultipartUpload completedUpload;
for (auto& part : handle->GetCompletedParts())
{
auto completedPart = Aws::S3::Model::CompletedPart()
.WithPartNumber(part.first)
.WithETag(part.second->GetETag());
SetChecksumForAlgorithm(part.second, completedPart);
completedUpload.AddParts(completedPart);
}
Aws::S3::Model::CompleteMultipartUploadRequest completeMultipartUploadRequest;
completeMultipartUploadRequest.SetCustomizedAccessLogTag(m_transferConfig.customizedAccessLogTag);
completeMultipartUploadRequest.SetContinueRequestHandler([=](const Aws::Http::HttpRequest*) { return handle->ShouldContinue(); });
completeMultipartUploadRequest.WithBucket(handle->GetBucketName())
.WithKey(handle->GetKey())
.WithUploadId(handle->GetMultiPartId())
.WithMultipartUpload(completedUpload);
if (m_transferConfig.uploadPartTemplate.SSECustomerAlgorithmHasBeenSet())
{
completeMultipartUploadRequest.WithSSECustomerAlgorithm(m_transferConfig.uploadPartTemplate.GetSSECustomerAlgorithm())
.WithSSECustomerKey(m_transferConfig.uploadPartTemplate.GetSSECustomerKey())
.WithSSECustomerKeyMD5(m_transferConfig.uploadPartTemplate.GetSSECustomerKeyMD5());
}
if (!handle->GetChecksum().empty()) {
SetChecksumOnRequest(completeMultipartUploadRequest, m_transferConfig.checksumAlgorithm, handle->GetChecksum());
completeMultipartUploadRequest.SetChecksumType(Aws::S3::Model::ChecksumType::FULL_OBJECT);
}
auto completeUploadOutcome = m_transferConfig.s3Client->CompleteMultipartUpload(completeMultipartUploadRequest);
if (completeUploadOutcome.IsSuccess())
{
AWS_LOGSTREAM_INFO(CLASS_TAG, "Transfer handle [" << handle->GetId()
<< "] Multi-part upload completed successfully to Bucket: ["
<< handle->GetBucketName() << "] with Key: [" << handle->GetKey()
<< "] with Upload ID: [" << handle->GetMultiPartId() << "].");
handle->UpdateStatus(TransferStatus::COMPLETED);
}
else
{
AWS_LOGSTREAM_ERROR(CLASS_TAG, "Transfer handle [" << handle->GetId()
<< "] Failed to complete multi-part upload. In Bucket: ["
<< handle->GetBucketName() << "] with Key: [" << handle->GetKey()
<< "] with Upload ID: [" << handle->GetMultiPartId()
<< "]. " << completeUploadOutcome.GetError());
handle->UpdateStatus(DetermineIfFailedOrCanceled(*handle));
}
}
else
{
AWS_LOGSTREAM_TRACE(CLASS_TAG, "Transfer handle [" << handle->GetId() << "] " << failedParts.size()
<< " Failed parts. " << handle->GetBytesTransferred() << " bytes transferred out of "
<< handle->GetBytesTotalSize() << " total bytes.");
handle->UpdateStatus(DetermineIfFailedOrCanceled(*handle));
}
TriggerTransferStatusUpdatedCallback(handle);
}
}