FileCompleteOutcome ResumableUploader::Upload()

in sdk/src/resumable/ResumableUploader.cc [65:283]


FileCompleteOutcome ResumableUploader::Upload()
{
    FileCompleteOutcome validateOutcome;
    if (0 != validate(validateOutcome)) {
        return validateOutcome;
    }

    PdsError err;

    UploadPartRecordList partsToUpload;
    UploadPartRecordList uploadedParts;
    if (getPartsToUpload(err, uploadedParts, partsToUpload) != 0){
        return FileCompleteOutcome(err);
    }

    std::vector<DataPutOutcome> outcomes;

    // 顺序上传分片
    UploadPartRecord part;
    while (true) {
        // need lock when parallel upload
        {
            // std::lock_guard<std::mutex> lck(lock_);
            if (partsToUpload.empty())
                break;
            part = partsToUpload.front();
            partsToUpload.erase(partsToUpload.begin());
        }

        if (!client_->isEnableRequest())
            break;

        // check resumable progress control
        if (UploadPartProcessControlCallback((void *)this) != 0)
            break;

        uint64_t offset = partSize_ * (part.partNumber - 1);
        uint64_t length = part.size;

        auto content = GetFstreamByPath(request_.FilePath(), request_.FilePathW(),
            std::ios::in | std::ios::binary);
        content->seekg(offset, content->beg);

        PartInfoReqList partInfoReqList;
        PartInfoReq info(part.partNumber, part.size, offset, offset+length);
        partInfoReqList.push_back(info);
        FileGetUploadUrlRequest getUploadPartUrlRequest(record_.driveID, record_.fileID, record_.uploadID, partInfoReqList);
        auto getUploadPartUrlOutcome = FileGetUploadUrlWrap(getUploadPartUrlRequest);
        if (!getUploadPartUrlOutcome.isSuccess()) {
            return FileCompleteOutcome(getUploadPartUrlOutcome.error());
        }

        auto partInfoResp = getUploadPartUrlOutcome.result().PartInfoRespList();
        if (partInfoResp.empty()) {
            return FileCompleteOutcome(PdsError("GetUploadUrlError", "Get Upload url empty."));
        }

        DataPutByUrlRequest putPartRequest(partInfoResp[0].UploadUrl(), content);
        putPartRequest.setContentLength(length);
        putPartRequest.setFlags(putPartRequest.Flags() | REQUEST_FLAG_CHECK_CRC64 | REQUEST_FLAG_SAVE_CLIENT_CRC64);
        auto process = request_.TransferProgress();
        if (process.Handler) {
            TransferProgress uploadPartProcess = { UploadPartProcessCallback, (void *)this };
            putPartRequest.setTransferProgress(uploadPartProcess);
        }
        auto progressControl = request_.ProgressControl();
        if (progressControl.Handler) {
            ProgressControl uploadPartProgressControl = { UploadPartProcessControlCallback, (void *)this };
            putPartRequest.setProgressControl(uploadPartProgressControl);
        }
        if (request_.TrafficLimit() != 0) {
            putPartRequest.setTrafficLimit(request_.TrafficLimit());
        }
        auto putPartOutcome = UploadPartWrap(putPartRequest);
#ifdef ENABLE_PDS_TEST
        if (!!(request_.Flags() & 0x40000000) && part.PartNumber() == 2) {
            const char* TAG = "ResumableUploadClient";
            PDS_LOG(LogLevel::LogDebug, TAG, "NO.2 part data upload failed.");
            outcome = DataPutOutcome();
        }
#endif // ENABLE_PDS_TEST

        // local record lack of this uploaded part info, cause repeated upload
        bool partAlreadyExist = false;
        if (!putPartOutcome.isSuccess() && putPartOutcome.error().Code() == "PartAlreadyExist" &&
            putPartOutcome.error().Message().find("sequential")) {
                partAlreadyExist = true;
        }

        // need lock when parallel upload
        {
            // std::lock_guard<std::mutex> lck(lock_);
            if (putPartOutcome.isSuccess() || partAlreadyExist) {
                part.crc64 = putPartOutcome.result().CRC64();
                uploadedParts.push_back(part);
            }

            outcomes.push_back(putPartOutcome);

            //update record
            if (hasRecordPath() && (putPartOutcome.isSuccess() || partAlreadyExist)) {
                auto &record = record_;
                record.parts = uploadedParts;

                Json::Value root;
                root["opType"] = record_.opType;
                root["driveID"] = record_.driveID;
                root["fileID"] = record_.fileID;
                root["uploadID"] = record_.uploadID;
                root["name"] = record_.name;
                root["filePath"] = record_.filePath;
                root["taskID"] = record_.taskID;
                root["mtime"] = record_.mtime;
                root["size"] = record_.size;
                root["partSize"] = record_.partSize;

                int index = 0;
                for (UploadPartRecord& partR : record.parts) {
                    root["parts"][index]["partNumber"] = partR.partNumber;
                    root["parts"][index]["size"] = partR.size;
                    root["parts"][index]["crc64"] = partR.crc64;
                    index++;
                }

                std::stringstream ss;
                ss << root;
                std::string md5Sum = ComputeContentETag(ss);
                root["md5Sum"] = md5Sum;

                auto recordStream = GetFstreamByPath(recordPath_, recordPathW_, std::ios::out);
                if (recordStream->is_open()) {
                    *recordStream << root;
                    recordStream->close();
                }
            }
        }
    }

    if (!client_->isEnableRequest()) {
        return FileCompleteOutcome(PdsError("ClientError:100002", "Disable all requests by upper."));
    }

    int32_t controlFlag = UploadPartProcessControlCallback((void *)this);
    if (controlFlag == ProgressControlStop) {
        return FileCompleteOutcome(PdsError("ClientError:100003", "Upload stop by upper."));
    }
    if (controlFlag == ProgressControlCancel) {
        removeRecordFile();
        return FileCompleteOutcome(PdsError("ClientError:100004", "Upload cancel by upper."));
    }

    for (const auto& outcome : outcomes) {
        if (!outcome.isSuccess()) {
            // ignore PartAlreadyExist error
            if (outcome.error().Code() == "PartAlreadyExist" && outcome.error().Message().find("sequential")) {
                continue;
            }
            return FileCompleteOutcome(outcome.error());
        }
    }

    if (uploadedParts.size() < outcomes.size()) {
        return FileCompleteOutcome(PdsError("UploadNotComplete", "Not all parts are uploaded."));
    }

    // sort uploadedParts
    std::sort(uploadedParts.begin(), uploadedParts.end(), [&](const UploadPartRecord& a, const UploadPartRecord& b)
    {
        return a.partNumber < b.partNumber;
    });

    FileCompleteRequest completeReq(record_.driveID, record_.fileID, record_.uploadID);
    auto completeOutcome = FileCompleteWrap(completeReq);
    if (!completeOutcome.isSuccess()) {
        removeRecordFile();
        return FileCompleteOutcome(completeOutcome.error());
    }

    // check size
    uint64_t uploadedfileSize = completeOutcome.result().Size();
    if (fileSize_ != uploadedfileSize) {
        return FileCompleteOutcome(PdsError("FileSizeCheckError", "Resumable Upload data check size fail."));
    }

    //check crc64
    std::string crc64Hash = completeOutcome.result().Crc64Hash();
    if (client_->configuration().enableCrc64 && !crc64Hash.empty()) {
        uint64_t localCRC64 = uploadedParts[0].crc64;
        for (size_t i = 1; i < uploadedParts.size(); i++) {
            localCRC64 = CRC64::CombineCRC(localCRC64, uploadedParts[i].crc64, uploadedParts[i].size);
        }
        uint64_t serverCRC64 = std::strtoull(crc64Hash.c_str(), nullptr, 10);
        if (localCRC64 != serverCRC64) {
            // check sha1
            std::string cHash = completeOutcome.result().ContentHash();
            std::string cHashName = completeOutcome.result().ContentHashName();
            if (cHashName == "sha1" && !cHash.empty()) {
                std::string hashSHA1;
                if (0 != ComputeFileSha1(completeOutcome, hashSHA1)){
                    return completeOutcome;
                }
                transform(cHash.begin(), cHash.end(), cHash.begin(), ::toupper);
                transform(hashSHA1.begin(), hashSHA1.end(), hashSHA1.begin(), ::toupper);
                if (0 != cHash.compare(hashSHA1)) {
                    removeRecordFile();
                    return FileCompleteOutcome(PdsError("Sha1CheckError", "Resumable Upload data SHA1 checksum fail."));
                }
            }
            else {
                removeRecordFile();
                return FileCompleteOutcome(PdsError("CrcCheckError", "Resumable Upload data CRC checksum fail."));
            }
        }
    }

    removeRecordFile();

    return completeOutcome;
}