static taskResult upload_multipart()

in ptest/src/Program.cc [264:368]


static taskResult upload_multipart(const OssClient &client, const std::string &key, const std::string &fileToUpload)
{
    taskResult result;
    result.transferSize = get_file_size(fileToUpload);
    result.startTimePoint = std::chrono::system_clock::now();

    // Set the part size 
    const int partSize = Config::PartSize;
    const int64_t fileLength = result.transferSize;
    auto partCount = calculate_part_count(fileLength, partSize);

    std::vector< SubTaskInfo> SubTaskInfos;
    for (auto i = 0; i < partCount; i++) {
        SubTaskInfo subinfo;
        subinfo.partNum = i + 1;
        subinfo.offset  = partSize;
        subinfo.offset *= i;
        SubTaskInfos.push_back(subinfo);
    }

    std::vector<PutObjectOutcomeCallable> outcomes;
    bool failed = false;
    std::string code;
    std::string message;
    InitiateMultipartUploadRequest imuRequest(Config::BucketName, key);
    auto initOutcome = client.InitiateMultipartUpload(imuRequest);
    if (initOutcome.isSuccess()) {
        while (!SubTaskInfos.empty() && !failed) {
            if (outcomes.size() < static_cast<size_t>(Config::Parallel)) {
                SubTaskInfo subinfo = *SubTaskInfos.begin();
                SubTaskInfos.erase(SubTaskInfos.begin());
                std::shared_ptr<std::iostream> content = std::make_shared<std::fstream>(fileToUpload, std::ios::in | std::ios::binary);
                content->seekg(subinfo.offset);
                UploadPartRequest request(Config::BucketName, key, content);
                request.setPartNumber(subinfo.partNum);
                request.setUploadId(initOutcome.result().UploadId());
                request.setContentLength(Config::PartSize);
                auto outcomeCallable = client.UploadPartCallable(request);
                outcomes.emplace_back(std::move(outcomeCallable));
            }
            //check all outcomes 
            for (auto it = outcomes.begin(); !failed && it != outcomes.end();) {
                auto status = it->wait_for(std::chrono::milliseconds(100));
                if (status == std::future_status::ready) {
                    auto outcome = it->get();
                    if (!outcome.isSuccess()) {
                        failed  = true;
                        code    = outcome.error().Code();
                        message = outcome.error().Message();
                    }
                    it = outcomes.erase(it);
                }
                else {
                    it++;
                }
            }
        }

        for (auto it = outcomes.begin(); !failed && it != outcomes.end(); it++) {
            auto outcome = it->get();
            if (!outcome.isSuccess()) {
                failed  = true;
                code    = outcome.error().Code();
                message = outcome.error().Message();
            }
        }

        if (!failed) {
            auto listOutcome = client.ListParts(ListPartsRequest(Config::BucketName, key, initOutcome.result().UploadId()));
            if (listOutcome.isSuccess()) {
                auto cOutcome = client.CompleteMultipartUpload(
                    CompleteMultipartUploadRequest(Config::BucketName, key, 
                        listOutcome.result().PartList(),
                        initOutcome.result().UploadId()));
                if (!cOutcome.isSuccess()) {
                    failed = true;
                    code = listOutcome.error().Code();
                    message = listOutcome.error().Message();
                }
                else {
                    if (cOutcome.result().CRC64() != uploadFileCRC64) {
                        failed  = true;
                        code    = "CRC64 check fail.";
                        message = "CRC64 check fail.";
                    }
                }
            }
            else {
                failed  = true;
                code    = listOutcome.error().Code();
                message = listOutcome.error().Message();
            }
        }
    }
    else {
        failed = true;
        code = initOutcome.error().Code();
        message = initOutcome.error().Message();
    }

    result.stopTimePoint = std::chrono::system_clock::now();
    result.success = !failed;

    return result;
}