GetObjectOutcome ResumableDownloader::Download()

in sdk/src/resumable/ResumableDownloader.cc [37:248]


GetObjectOutcome ResumableDownloader::Download() 
{
    OssError err;

    if (0 != validate(err)) {
        return GetObjectOutcome(err);
    }

    PartRecordList partsToDownload;
    if (getPartsToDownload(err, partsToDownload) != 0) {
        return GetObjectOutcome(err);
    }

    //task queue
    PartRecordList downloadedParts;
    if (hasRecord_) {
        downloadedParts = record_.parts;
    }
    std::vector<GetObjectOutcome> outcomes;
    std::vector<std::thread> threadPool;

    for (uint32_t i = 0; i < request_.ThreadNum(); i++) {
        threadPool.emplace_back(std::thread([&]() {
            PartRecord part;
            while (true) {
                {
                std::lock_guard<std::mutex> lck(lock_);
                if (partsToDownload.empty())
                    break;
                part = partsToDownload.front();
                partsToDownload.erase(partsToDownload.begin());
                }

                if (!client_->isEnableRequest())
                    break;

                uint64_t pos = partSize_ * (part.partNumber - 1);
                uint64_t start = part.offset;
                uint64_t end = start + part.size - 1;
                auto getObjectReq = GetObjectRequest(request_.Bucket(), request_.Key(), request_.ModifiedSinceConstraint(), request_.UnmodifiedSinceConstraint(),
                    request_.MatchingETagsConstraint(), request_.NonmatchingETagsConstraint(), request_.ResponseHeaderParameters());
                getObjectReq.setResponseStreamFactory([=]() {
                    auto tmpFstream = GetFstreamByPath(request_.TempFilePath(), request_.TempFilePathW(),
                        std::ios_base::in | std::ios_base::out | std::ios_base::binary);
                    tmpFstream->seekp(pos, tmpFstream->beg);
                    return tmpFstream;
                });
                getObjectReq.setRange(start, end);
                getObjectReq.setFlags(getObjectReq.Flags() | REQUEST_FLAG_CHECK_CRC64 | REQUEST_FLAG_SAVE_CLIENT_CRC64);

                DownloaderTransferState transferState;
                auto process = request_.TransferProgress();
                if (process.Handler) {
                    transferState.transfered = 0;
                    transferState.userData = (void *)this;
                    TransferProgress uploadPartProcess = { DownloadPartProcessCallback, (void *)&transferState };
                    getObjectReq.setTransferProgress(uploadPartProcess);
                }
                if (request_.RequestPayer() == RequestPayer::Requester) {
                    getObjectReq.setRequestPayer(request_.RequestPayer());
                }
                if (request_.TrafficLimit() != 0) {
                    getObjectReq.setTrafficLimit(request_.TrafficLimit());
                }
                if (!request_.VersionId().empty()) {
                    getObjectReq.setVersionId(request_.VersionId());
                }
                auto outcome = GetObjectWrap(getObjectReq);
#ifdef ENABLE_OSS_TEST
                if (!!(request_.Flags() & 0x40000000) && part.partNumber == 2) {
                    const char* TAG = "ResumableDownloadObjectClient";
                    OSS_LOG(LogLevel::LogDebug, TAG, "NO.2 part data download failed.");
                    outcome = GetObjectOutcome();
                }
#endif // ENABLE_OSS_TEST

                // lock
                {
                    std::lock_guard<std::mutex> lck(lock_);
                    if (outcome.isSuccess()) {
                        part.crc64 = std::strtoull(outcome.result().Metadata().HttpMetaData().at("x-oss-hash-crc64ecma-by-client").c_str(), nullptr, 10);
                        downloadedParts.push_back(part);
                    }
                    outcomes.push_back(outcome);

                    //update record
                    if (hasRecordPath() && outcome.isSuccess()) {
                        auto &record = record_;
                        record.parts = downloadedParts;

                        Json::Value root;
                        root["opType"] = record.opType;
                        root["bucket"] = record.bucket;
                        root["key"] = record.key;
                        root["filePath"] = record.filePath;
                        root["mtime"] = record.mtime;
                        root["size"] = record.size;
                        root["partSize"] = record.partSize;

                        int index = 0;
                        for (PartRecord& partR : record.parts) {
                            root["parts"][index]["partNumber"] = partR.partNumber;
                            root["parts"][index]["size"] = partR.size;
                            root["parts"][index]["crc64"] = partR.crc64;
                            index++;
                        }

                        std::stringstream ss;
                        ss << root;
                        std::string md5Sum = ComputeContentETag(ss);
                        root["md5Sum"] = md5Sum;

                        if (request_.RangeIsSet()) {
                            root["rangeStart"] = record.rangeStart;
                            root["rangeEnd"] = record.rangeEnd;
                        }

                        auto recordStream = GetFstreamByPath(recordPath_, recordPathW_, std::ios::out);
                        if (recordStream->is_open()) {
                            *recordStream << root;
                            recordStream->close();
                        }
                    }
                }
            }
        }));
    }

    for (auto& worker : threadPool) {
        if (worker.joinable()) {
            worker.join();
        }
    }

    std::shared_ptr<std::iostream> content = nullptr;
    for (auto& outcome : outcomes) {
        if (!outcome.isSuccess()) {
            return GetObjectOutcome(outcome.error());
        }
        outcome.result().setContent(content);
    }

    if (!client_->isEnableRequest()) {
        return GetObjectOutcome(OssError("ClientError:100002", "Disable all requests by upper."));
    }

    // sort
    std::sort(downloadedParts.begin(), downloadedParts.end(), [&](const PartRecord& a, const PartRecord& b)
    {
        return a.partNumber < b.partNumber;
    });

    ObjectMetaData meta;
    if (outcomes.empty()) {
        HeadObjectRequest hRequest(request_.Bucket(), request_.Key());
        if (request_.RequestPayer() == RequestPayer::Requester) {
            hRequest.setRequestPayer(request_.RequestPayer());
        }
        if (!request_.VersionId().empty()) {
            hRequest.setVersionId(request_.VersionId());
        }
        auto hOutcome = client_->HeadObject(hRequest);
        if (!hOutcome.isSuccess()) {
            return GetObjectOutcome(hOutcome.error());
        }
        meta = hOutcome.result();
    }
    else {
        meta = outcomes[0].result().Metadata();
    }
    meta.setContentLength(contentLength_);

    //check crc and update metadata
    if (!request_.RangeIsSet()) {
        if (client_->configuration().enableCrc64) {
            uint64_t localCRC64 = downloadedParts[0].crc64;
            for (size_t i = 1; i < downloadedParts.size(); i++) {
                localCRC64 = CRC64::CombineCRC(localCRC64, downloadedParts[i].crc64, downloadedParts[i].size);
            }
            if (localCRC64 != meta.CRC64()) {
                return GetObjectOutcome(OssError("CrcCheckError", "ResumableDownload object CRC checksum fail."));
            }
        }
        meta.HttpMetaData().erase(Http::CONTENT_RANGE);
    }
    else {
        std::stringstream ss;
        ss << "bytes " << std::to_string(request_.RangeStart()) << "-";
        if (request_.RangeEnd() != -1) { 
            ss << std::to_string(request_.RangeEnd()) << "/" << std::to_string(objectSize_);
        } 
        else {
            ss << std::to_string(objectSize_ - 1) << "/" << std::to_string(objectSize_);
        }
        meta.HttpMetaData()["Content-Range"] = ss.str();
    }

    if (meta.HttpMetaData().find("x-oss-hash-crc64ecma-by-client") != meta.HttpMetaData().end()) {
        meta.HttpMetaData().erase("x-oss-hash-crc64ecma-by-client");
   }

    if (!renameTempFile()) {
        std::stringstream ss;
        ss << "rename temp file failed";
        return GetObjectOutcome(OssError("RenameError", ss.str()));
    }

    removeRecordFile();

    GetObjectResult result(request_.Bucket(), request_.Key(), meta);
    return GetObjectOutcome(result);
}