CopyObjectOutcome ResumableCopier::Copy()

in sdk/src/resumable/ResumableCopier.cc [34:172]


CopyObjectOutcome ResumableCopier::Copy() 
{
    OssError err;

    if (0 != validate(err)) {
        return CopyObjectOutcome(err);
    }

    PartList partsToUploadCopy;
    PartList partsCopied;
    if (getPartsToUploadCopy(err, partsCopied, partsToUploadCopy) != 0) {
        return CopyObjectOutcome(err);
    }

    std::vector<UploadPartCopyOutcome> outcomes;
    std::vector<std::thread> threadPool;

    for (uint32_t i = 0; i < request_.ThreadNum(); i++) {
        threadPool.emplace_back(std::thread([&]() {
            Part part;
            while (true) {
                {
                std::lock_guard<std::mutex> lck(lock_);
                if (partsToUploadCopy.empty())
                    break;
                part = partsToUploadCopy.front();
                partsToUploadCopy.erase(partsToUploadCopy.begin());
                }

                if (!client_->isEnableRequest())
                    break;

                uint64_t offset = partSize_ * (part.PartNumber() - 1);
                uint64_t length = part.Size();

                auto uploadPartCopyReq = UploadPartCopyRequest(request_.Bucket(), request_.Key(), request_.SrcBucket(), request_.SrcKey(),
                    uploadID_, part.PartNumber(), 
                    request_.SourceIfMatchEtag(), request_.SourceIfNotMatchEtag(),
                    request_.SourceIfModifiedSince(), request_.SourceIfUnModifiedSince());
                uploadPartCopyReq.setCopySourceRange(offset, offset + length - 1);
                if (request_.RequestPayer() == RequestPayer::Requester) {
                    uploadPartCopyReq.setRequestPayer(request_.RequestPayer());
                }
                if (request_.TrafficLimit() != 0) {
                    uploadPartCopyReq.setTrafficLimit(request_.TrafficLimit());
                }
                if (!request_.VersionId().empty()) {
                    uploadPartCopyReq.setVersionId(request_.VersionId());
                }
                auto outcome = client_->UploadPartCopy(uploadPartCopyReq);
#ifdef ENABLE_OSS_TEST
                if (!!(request_.Flags() & 0x40000000) && (part.PartNumber() == 2 || part.PartNumber() == 4)) {
                    const char* TAG = "ResumableCopyObjectClient";
                    OSS_LOG(LogLevel::LogDebug, TAG, "NO.%d part data copy failed!", part.PartNumber());
                    outcome = UploadPartCopyOutcome();
                }
#endif // ENABLE_OSS_TEST

                //lock
                {
                    std::lock_guard<std::mutex> lck(lock_);
                    if (outcome.isSuccess()) {
                        part.eTag_ = outcome.result().ETag();
                        partsCopied.push_back(part);
                    }
                    outcomes.push_back(outcome);
                    if (outcome.isSuccess()) {
                        auto process = request_.TransferProgress();

                        if (process.Handler) {
                            consumedSize_ += length;
                            process.Handler((size_t)length, consumedSize_, objectSize_, process.UserData);
                        }
                    }
                }

            }
        }));
    }

    for (auto& worker : threadPool) {
        if (worker.joinable()) {
            worker.join();
        }
    }

    for (const auto& outcome : outcomes) {
        if (!outcome.isSuccess()) {
            return CopyObjectOutcome(outcome.error());
        }
    }

    if (!client_->isEnableRequest()) {
        return CopyObjectOutcome(OssError("ClientError:100002", "Disable all requests by upper."));
    }

    // sort partsCopied
    std::sort(partsCopied.begin(), partsCopied.end(), [](const Part& a, const Part& b) 
    {
        return a.PartNumber() < b.PartNumber();
    });

    CompleteMultipartUploadRequest completeMultipartUploadReq(request_.Bucket(), request_.Key(), partsCopied, uploadID_);
    if (request_.MetaData().HttpMetaData().find("x-oss-object-acl")
        != request_.MetaData().HttpMetaData().end()) {
        std::string aclName = request_.MetaData().HttpMetaData().at("x-oss-object-acl");
        completeMultipartUploadReq.setAcl(ToAclType(aclName.c_str()));
    }
    if (!request_.EncodingType().empty()) {
        completeMultipartUploadReq.setEncodingType(request_.EncodingType());
    }
    if (request_.RequestPayer() == RequestPayer::Requester) {
        completeMultipartUploadReq.setRequestPayer(request_.RequestPayer());
    }

    auto compOutcome = client_->CompleteMultipartUpload(completeMultipartUploadReq);
    if (!compOutcome.isSuccess()) {
        return CopyObjectOutcome(compOutcome.error());
    }

    removeRecordFile();

    CopyObjectResult result;
    HeadObjectRequest hRequest(request_.Bucket(), request_.Key());
    if (request_.RequestPayer() == RequestPayer::Requester) {
        hRequest.setRequestPayer(request_.RequestPayer());
    }
    if (!compOutcome.result().VersionId().empty()) {
        hRequest.setVersionId(compOutcome.result().VersionId());
    }
    auto hOutcome = client_->HeadObject(HeadObjectRequest(hRequest));
    if (hOutcome.isSuccess()) {
        result.setLastModified(hOutcome.result().LastModified());
    }
    result.setEtag(compOutcome.result().ETag());
    result.setRequestId(compOutcome.result().RequestId());
    result.setVersionId(compOutcome.result().VersionId());
    return CopyObjectOutcome(result);
}