PutObjectOutcome ResumableUploader::Upload()

in sdk/src/resumable/ResumableUploader.cc [66:217]


PutObjectOutcome ResumableUploader::Upload()
{
    OssError err;

    if (0 != validate(err)) {
        return PutObjectOutcome(err);
    }

    PartList partsToUpload;
    PartList uploadedParts;
    if (getPartsToUpload(err, uploadedParts, partsToUpload) != 0){
        return PutObjectOutcome(err);
    }

    std::vector<PutObjectOutcome> outcomes;
    std::vector<std::thread> threadPool;

    for (uint32_t i = 0; i < request_.ThreadNum(); i++) {
        threadPool.emplace_back(std::thread([&]() {
            Part part;
            while (true) {
                {
                std::lock_guard<std::mutex> lck(lock_);
                if (partsToUpload.empty())
                    break;
                part = partsToUpload.front();
                partsToUpload.erase(partsToUpload.begin());
                }

                if (!client_->isEnableRequest())
                    break;

                uint64_t offset = partSize_ * (part.PartNumber() - 1);
                uint64_t length = part.Size();

                auto content = GetFstreamByPath(request_.FilePath(), request_.FilePathW(),
                    std::ios::in | std::ios::binary);
                content->seekg(offset, content->beg);

                UploadPartRequest uploadPartRequest(request_.Bucket(), request_.Key(), part.PartNumber(), uploadID_, content);
                uploadPartRequest.setContentLength(length);

                UploaderTransferState transferState;
                auto process = request_.TransferProgress();
                if (process.Handler) {
                    transferState.transfered = 0;
                    transferState.userData = (void *)this;
                    TransferProgress uploadPartProcess = { UploadPartProcessCallback, (void *)&transferState };
                    uploadPartRequest.setTransferProgress(uploadPartProcess);
                }
                if (request_.RequestPayer() == RequestPayer::Requester) {
                    uploadPartRequest.setRequestPayer(request_.RequestPayer());
                }
                if (request_.TrafficLimit() != 0) {
                    uploadPartRequest.setTrafficLimit(request_.TrafficLimit());
                }
                auto outcome = UploadPartWrap(uploadPartRequest);
#ifdef ENABLE_OSS_TEST
                if (!!(request_.Flags() & 0x40000000) && part.PartNumber() == 2) {
                    const char* TAG = "ResumableUploadObjectClient";
                    OSS_LOG(LogLevel::LogDebug, TAG, "NO.2 part data upload failed.");
                    outcome = PutObjectOutcome();
                }
#endif // ENABLE_OSS_TEST

                if (outcome.isSuccess()) {
                    part.eTag_  = outcome.result().ETag();
                    part.cRC64_ = outcome.result().CRC64();
                }

                //lock
                {
                std::lock_guard<std::mutex> lck(lock_);
                uploadedParts.push_back(part);
                outcomes.push_back(outcome);
                }
            }
        }));
    }

    for (auto& worker:threadPool) {
        if(worker.joinable()){
            worker.join();
        }
    }

    if (!client_->isEnableRequest()) {
        return PutObjectOutcome(OssError("ClientError:100002", "Disable all requests by upper."));
    }

    for (const auto& outcome : outcomes) {
        if (!outcome.isSuccess()) {
            return PutObjectOutcome(outcome.error());
        }
    }

    // sort uploadedParts
    std::sort(uploadedParts.begin(), uploadedParts.end(), [&](const Part& a, const Part& b)
    {
        return a.PartNumber() < b.PartNumber();
    });

    CompleteMultipartUploadRequest completeMultipartUploadReq(request_.Bucket(), request_.Key(), uploadedParts, uploadID_);
    if (request_.MetaData().hasHeader("x-oss-object-acl")) {
        completeMultipartUploadReq.MetaData().HttpMetaData()["x-oss-object-acl"] =
            request_.MetaData().HttpMetaData().at("x-oss-object-acl");
    }
    if (!request_.EncodingType().empty()) {
        completeMultipartUploadReq.setEncodingType(request_.EncodingType());
    }
    if (request_.MetaData().hasHeader("x-oss-callback")) {
        completeMultipartUploadReq.MetaData().HttpMetaData()["x-oss-callback"] =
            request_.MetaData().HttpMetaData().at("x-oss-callback");
        if (request_.MetaData().hasHeader("x-oss-callback-var")) {
            completeMultipartUploadReq.MetaData().HttpMetaData()["x-oss-callback-var"] =
                request_.MetaData().HttpMetaData().at("x-oss-callback-var");
        }
        if (request_.MetaData().hasHeader("x-oss-pub-key-url")) {
            completeMultipartUploadReq.MetaData().HttpMetaData()["x-oss-pub-key-url"] =
                request_.MetaData().HttpMetaData().at("x-oss-pub-key-url");
        }
    }
    if (request_.RequestPayer() == RequestPayer::Requester) {
        completeMultipartUploadReq.setRequestPayer(request_.RequestPayer());
    }
    auto outcome = CompleteMultipartUploadWrap(completeMultipartUploadReq);
    if (!outcome.isSuccess()) {
        return PutObjectOutcome(outcome.error());
    }

    // crc
    uint64_t localCRC64 = uploadedParts[0].CRC64();
    for (size_t i = 1; i < uploadedParts.size(); i++) {
        localCRC64 = CRC64::CombineCRC(localCRC64, uploadedParts[i].CRC64(), uploadedParts[i].Size());
    }

    uint64_t ossCRC64 = outcome.result().CRC64();
    if (ossCRC64 != 0 && localCRC64 != ossCRC64) {
        return PutObjectOutcome(OssError("CrcCheckError", "ResumableUpload Object CRC Checksum fail."));
    }

    removeRecordFile();

    HeaderCollection headers;
    headers[Http::ETAG] = outcome.result().ETag();
    headers["x-oss-hash-crc64ecma"] = std::to_string(outcome.result().CRC64());
    headers["x-oss-request-id"] = outcome.result().RequestId();
    if (!outcome.result().VersionId().empty()) {
        headers["x-oss-version-id"] = outcome.result().VersionId();
    }
    return PutObjectOutcome(PutObjectResult(headers, outcome.result().Content()));
}