DataGetOutcome ResumableDownloader::Download()

in sdk/src/resumable/ResumableDownloader.cc [31:274]


DataGetOutcome ResumableDownloader::Download()
{
    PdsError err;

    if (0 != validate(err)) {
        return DataGetOutcome(err);
    }

    PartRecordList partsToDownload;
    if (getPartsToDownload(err, partsToDownload) != 0) {
        return DataGetOutcome(err);
    }

    if (url_.empty()) {
        FileGetDownloadUrlRequest getDownloadUrlRequest(record_.driveID, record_.shareID, record_.fileID);
        getDownloadUrlRequest.setShareToken(request_.ShareToken());

        auto getDownloadUrlOutcome = FileGetDownloadUrlWrap(getDownloadUrlRequest);
        if (!getDownloadUrlOutcome.isSuccess()) {
            return DataGetOutcome(getDownloadUrlOutcome.error());
        }
        url_ = getDownloadUrlOutcome.result().Url();
    }

    //task queue
    PartRecordList downloadedParts;
    if (hasRecord_) {
        downloadedParts = record_.parts;
    }
    std::vector<DataGetOutcome> outcomes;
    std::vector<std::thread> threadPool;

    for (uint32_t i = 0; i < request_.ThreadNum(); i++) {
        threadPool.emplace_back(std::thread([&]() {
            PartRecord part;
            while (true) {
                {
                    std::lock_guard<std::mutex> lck(lock_);
                    if (partsToDownload.empty())
                        break;
                    part = partsToDownload.front();
                    partsToDownload.erase(partsToDownload.begin());
                }

                if (!client_->isEnableRequest())
                    break;

                // check resumable progress control
                if (DownloadPartProcessControlCallback((void *)this) != 0)
                    break;

                uint64_t pos = partSize_ * (part.partNumber - 1);
                uint64_t start = part.offset;
                uint64_t end = start + part.size - 1;

                auto getDataReq = DataGetByUrlRequest(url_);
                getDataReq.setResponseStreamFactory([=]() {
                    auto tmpFstream = GetFstreamByPath(request_.TempFilePath(), request_.TempFilePathW(),
                        std::ios_base::in | std::ios_base::out | std::ios_base::binary);
                    tmpFstream->seekp(pos, tmpFstream->beg);
                    return tmpFstream;
                });
                getDataReq.setRange(start, end);
                if (!crc64Hash_.empty()) {
                    getDataReq.setFlags(getDataReq.Flags() | REQUEST_FLAG_CHECK_CRC64 | REQUEST_FLAG_SAVE_CLIENT_CRC64);
                }

                auto process = request_.TransferProgress();
                if (process.Handler) {
                    TransferProgress downloadPartProcess = { DownloadPartProcessCallback, (void *)this };
                    getDataReq.setTransferProgress(downloadPartProcess);
                }
                auto progressControl = request_.ProgressControl();
                if (progressControl.Handler) {
                    ProgressControl downloadPartProgressControl = { DownloadPartProcessControlCallback, (void *)this };
                    getDataReq.setProgressControl(downloadPartProgressControl);
                }
                if (request_.TrafficLimit() != 0) {
                    getDataReq.setTrafficLimit(request_.TrafficLimit());
                }
                auto outcome = DataGetByUrlWrap(getDataReq);

                // lock
                bool needRetry = false;
                {
                    std::lock_guard<std::mutex> lck(lock_);
                    if (!outcome.isSuccess() && outcome.error().Code() == "AccessDenied" && outcome.error().Message().find("expired")) {
                        FileGetDownloadUrlRequest getDownloadUrlRequest(record_.driveID, record_.shareID ,record_.fileID);
                        getDownloadUrlRequest.setShareToken(request_.ShareToken());

                        auto getDownloadUrlOutcome = FileGetDownloadUrlWrap(getDownloadUrlRequest);
                        if (!getDownloadUrlOutcome.isSuccess()) {
                            outcomes.push_back(getDownloadUrlOutcome.error());
                            break;
                        }
                        // check file content-hash
                        auto contentHash = getDownloadUrlOutcome.result().ContentHash();
                        auto fileSize = getDownloadUrlOutcome.result().Size();
                        if (contentHash != contentHash_ || fileSize != (int64_t)fileSize_) {
                            outcomes.push_back(PdsError("SourceFileModified","Source file has been modified since last download."));
                            break;
                        }
                        url_ = getDownloadUrlOutcome.result().Url();
                        needRetry = true;
                    }
                }
                if (needRetry){
                    getDataReq.setUrl(url_);
                    getDataReq.setResponseStreamFactory([=]() {
                        auto tmpFstream = GetFstreamByPath(request_.TempFilePath(), request_.TempFilePathW(),
                            std::ios_base::in | std::ios_base::out | std::ios_base::binary);
                        tmpFstream->seekp(pos, tmpFstream->beg);
                        return tmpFstream;
                    });
                    outcome = DataGetByUrlWrap(getDataReq);
                }
#ifdef ENABLE_PDS_TEST
                if (!!(request_.Flags() & 0x40000000) && part.partNumber == 2) {
                    const char* TAG = "ResumableDownloadClient";
                    PDS_LOG(LogLevel::LogDebug, TAG, "NO.2 part data download failed.");
                    outcome = DataGetOutcome();
                }
#endif // ENABLE_PDS_TEST

                // lock
                {
                    std::lock_guard<std::mutex> lck(lock_);
                    if (outcome.isSuccess()) {
                        if (!crc64Hash_.empty()) {
                            part.crc64 = std::strtoull(outcome.result().Metadata().HttpMetaData().at("x-oss-hash-crc64ecma-by-client").c_str(), nullptr, 10);
                        }
                        downloadedParts.push_back(part);
                    }
                    outcomes.push_back(outcome);

                    //update record
                    if (hasRecordPath() && outcome.isSuccess()) {
                        auto &record = record_;
                        record.parts = downloadedParts;

                        Json::Value root;
                        root["opType"] = record.opType;
                        root["driveID"] = record.driveID;
                        root["shareID"] = record.shareID;
                        root["fileID"] = record.fileID;
                        root["contentHash"] = record.contentHash;
                        root["filePath"] = record.filePath;
                        root["mtime"] = record.mtime;
                        root["size"] = record.size;
                        root["partSize"] = record.partSize;

                        int index = 0;
                        for (PartRecord& partR : record.parts) {
                            root["parts"][index]["partNumber"] = partR.partNumber;
                            root["parts"][index]["size"] = partR.size;
                            root["parts"][index]["crc64"] = partR.crc64;
                            index++;
                        }

                        std::stringstream ss;
                        ss << root;
                        std::string md5Sum = ComputeContentETag(ss);
                        root["md5Sum"] = md5Sum;

                        auto recordStream = GetFstreamByPath(recordPath_, recordPathW_, std::ios::out);
                        if (recordStream->is_open()) {
                            *recordStream << root;
                            recordStream->close();
                        }
                    }
                }
            }
        }));
    }

    for (auto& worker : threadPool) {
        if (worker.joinable()) {
            worker.join();
        }
    }

    if (!client_->isEnableRequest()) {
        return DataGetOutcome(PdsError("ClientError:100002", "Disable all requests by upper."));
    }

    int32_t controlFlag = DownloadPartProcessControlCallback((void *)this);
    if (controlFlag == ProgressControlStop) {
        return DataGetOutcome(PdsError("ClientError:100003", "Download stop by upper."));
    }
    if (controlFlag == ProgressControlCancel) {
        removeRecordFile();
        removeTempFile();
        return DataGetOutcome(PdsError("ClientError:100004", "Download cancel by upper."));
    }

    std::shared_ptr<std::iostream> content = nullptr;
    for (auto& outcome : outcomes) {
        if (!outcome.isSuccess()) {
            return DataGetOutcome(outcome.error());
        }
        outcome.result().setContent(content);
    }

    if (downloadedParts.size() < outcomes.size()) {
        return DataGetOutcome(PdsError("DownloadNotComplete", "Not all parts are downloaded."));
    }

    std::sort(downloadedParts.begin(), downloadedParts.end(), [&](const PartRecord& a, const PartRecord& b)
    {
        return a.partNumber < b.partNumber;
    });

    // check size
    if (client_->configuration().enableCheckDownloadFileSize) {
        uint64_t localFileSize = GetFileSize(request_.TempFilePath(), request_.TempFilePathW());
        if (fileSize_ != localFileSize) {
            return DataGetOutcome(PdsError("FileSizeCheckError", "Resumable Download data check size fail."));
        }
    }

    // check crc64
    if (client_->configuration().enableCrc64 && !crc64Hash_.empty()) {
        uint64_t localCRC64 = downloadedParts[0].crc64;
        for (size_t i = 1; i < downloadedParts.size(); i++) {
            localCRC64 = CRC64::CombineCRC(localCRC64, downloadedParts[i].crc64, downloadedParts[i].size);
        }
        uint64_t serverCRC64 = std::strtoull(crc64Hash_.c_str(), nullptr, 10);
        if (localCRC64 != serverCRC64) {
            removeRecordFile();
            return DataGetOutcome(PdsError("CrcCheckError", "Resumable Download data CRC checksum fail."));
        }
    }

    if (!renameTempFile()) {
        std::stringstream ss;
        ss << "rename temp file failed";
        return DataGetOutcome(PdsError("RenameError", ss.str()));
    }

    removeRecordFile();

    DataGetResult result;
    return DataGetOutcome(result);
}