in sdk/src/resumable/ResumableUploader.cc [65:283]
FileCompleteOutcome ResumableUploader::Upload()
{
FileCompleteOutcome validateOutcome;
if (0 != validate(validateOutcome)) {
return validateOutcome;
}
PdsError err;
UploadPartRecordList partsToUpload;
UploadPartRecordList uploadedParts;
if (getPartsToUpload(err, uploadedParts, partsToUpload) != 0){
return FileCompleteOutcome(err);
}
std::vector<DataPutOutcome> outcomes;
// 顺序上传分片
UploadPartRecord part;
while (true) {
// need lock when parallel upload
{
// std::lock_guard<std::mutex> lck(lock_);
if (partsToUpload.empty())
break;
part = partsToUpload.front();
partsToUpload.erase(partsToUpload.begin());
}
if (!client_->isEnableRequest())
break;
// check resumable progress control
if (UploadPartProcessControlCallback((void *)this) != 0)
break;
uint64_t offset = partSize_ * (part.partNumber - 1);
uint64_t length = part.size;
auto content = GetFstreamByPath(request_.FilePath(), request_.FilePathW(),
std::ios::in | std::ios::binary);
content->seekg(offset, content->beg);
PartInfoReqList partInfoReqList;
PartInfoReq info(part.partNumber, part.size, offset, offset+length);
partInfoReqList.push_back(info);
FileGetUploadUrlRequest getUploadPartUrlRequest(record_.driveID, record_.fileID, record_.uploadID, partInfoReqList);
auto getUploadPartUrlOutcome = FileGetUploadUrlWrap(getUploadPartUrlRequest);
if (!getUploadPartUrlOutcome.isSuccess()) {
return FileCompleteOutcome(getUploadPartUrlOutcome.error());
}
auto partInfoResp = getUploadPartUrlOutcome.result().PartInfoRespList();
if (partInfoResp.empty()) {
return FileCompleteOutcome(PdsError("GetUploadUrlError", "Get Upload url empty."));
}
DataPutByUrlRequest putPartRequest(partInfoResp[0].UploadUrl(), content);
putPartRequest.setContentLength(length);
putPartRequest.setFlags(putPartRequest.Flags() | REQUEST_FLAG_CHECK_CRC64 | REQUEST_FLAG_SAVE_CLIENT_CRC64);
auto process = request_.TransferProgress();
if (process.Handler) {
TransferProgress uploadPartProcess = { UploadPartProcessCallback, (void *)this };
putPartRequest.setTransferProgress(uploadPartProcess);
}
auto progressControl = request_.ProgressControl();
if (progressControl.Handler) {
ProgressControl uploadPartProgressControl = { UploadPartProcessControlCallback, (void *)this };
putPartRequest.setProgressControl(uploadPartProgressControl);
}
if (request_.TrafficLimit() != 0) {
putPartRequest.setTrafficLimit(request_.TrafficLimit());
}
auto putPartOutcome = UploadPartWrap(putPartRequest);
#ifdef ENABLE_PDS_TEST
if (!!(request_.Flags() & 0x40000000) && part.PartNumber() == 2) {
const char* TAG = "ResumableUploadClient";
PDS_LOG(LogLevel::LogDebug, TAG, "NO.2 part data upload failed.");
outcome = DataPutOutcome();
}
#endif // ENABLE_PDS_TEST
// local record lack of this uploaded part info, cause repeated upload
bool partAlreadyExist = false;
if (!putPartOutcome.isSuccess() && putPartOutcome.error().Code() == "PartAlreadyExist" &&
putPartOutcome.error().Message().find("sequential")) {
partAlreadyExist = true;
}
// need lock when parallel upload
{
// std::lock_guard<std::mutex> lck(lock_);
if (putPartOutcome.isSuccess() || partAlreadyExist) {
part.crc64 = putPartOutcome.result().CRC64();
uploadedParts.push_back(part);
}
outcomes.push_back(putPartOutcome);
//update record
if (hasRecordPath() && (putPartOutcome.isSuccess() || partAlreadyExist)) {
auto &record = record_;
record.parts = uploadedParts;
Json::Value root;
root["opType"] = record_.opType;
root["driveID"] = record_.driveID;
root["fileID"] = record_.fileID;
root["uploadID"] = record_.uploadID;
root["name"] = record_.name;
root["filePath"] = record_.filePath;
root["taskID"] = record_.taskID;
root["mtime"] = record_.mtime;
root["size"] = record_.size;
root["partSize"] = record_.partSize;
int index = 0;
for (UploadPartRecord& partR : record.parts) {
root["parts"][index]["partNumber"] = partR.partNumber;
root["parts"][index]["size"] = partR.size;
root["parts"][index]["crc64"] = partR.crc64;
index++;
}
std::stringstream ss;
ss << root;
std::string md5Sum = ComputeContentETag(ss);
root["md5Sum"] = md5Sum;
auto recordStream = GetFstreamByPath(recordPath_, recordPathW_, std::ios::out);
if (recordStream->is_open()) {
*recordStream << root;
recordStream->close();
}
}
}
}
if (!client_->isEnableRequest()) {
return FileCompleteOutcome(PdsError("ClientError:100002", "Disable all requests by upper."));
}
int32_t controlFlag = UploadPartProcessControlCallback((void *)this);
if (controlFlag == ProgressControlStop) {
return FileCompleteOutcome(PdsError("ClientError:100003", "Upload stop by upper."));
}
if (controlFlag == ProgressControlCancel) {
removeRecordFile();
return FileCompleteOutcome(PdsError("ClientError:100004", "Upload cancel by upper."));
}
for (const auto& outcome : outcomes) {
if (!outcome.isSuccess()) {
// ignore PartAlreadyExist error
if (outcome.error().Code() == "PartAlreadyExist" && outcome.error().Message().find("sequential")) {
continue;
}
return FileCompleteOutcome(outcome.error());
}
}
if (uploadedParts.size() < outcomes.size()) {
return FileCompleteOutcome(PdsError("UploadNotComplete", "Not all parts are uploaded."));
}
// sort uploadedParts
std::sort(uploadedParts.begin(), uploadedParts.end(), [&](const UploadPartRecord& a, const UploadPartRecord& b)
{
return a.partNumber < b.partNumber;
});
FileCompleteRequest completeReq(record_.driveID, record_.fileID, record_.uploadID);
auto completeOutcome = FileCompleteWrap(completeReq);
if (!completeOutcome.isSuccess()) {
removeRecordFile();
return FileCompleteOutcome(completeOutcome.error());
}
// check size
uint64_t uploadedfileSize = completeOutcome.result().Size();
if (fileSize_ != uploadedfileSize) {
return FileCompleteOutcome(PdsError("FileSizeCheckError", "Resumable Upload data check size fail."));
}
//check crc64
std::string crc64Hash = completeOutcome.result().Crc64Hash();
if (client_->configuration().enableCrc64 && !crc64Hash.empty()) {
uint64_t localCRC64 = uploadedParts[0].crc64;
for (size_t i = 1; i < uploadedParts.size(); i++) {
localCRC64 = CRC64::CombineCRC(localCRC64, uploadedParts[i].crc64, uploadedParts[i].size);
}
uint64_t serverCRC64 = std::strtoull(crc64Hash.c_str(), nullptr, 10);
if (localCRC64 != serverCRC64) {
// check sha1
std::string cHash = completeOutcome.result().ContentHash();
std::string cHashName = completeOutcome.result().ContentHashName();
if (cHashName == "sha1" && !cHash.empty()) {
std::string hashSHA1;
if (0 != ComputeFileSha1(completeOutcome, hashSHA1)){
return completeOutcome;
}
transform(cHash.begin(), cHash.end(), cHash.begin(), ::toupper);
transform(hashSHA1.begin(), hashSHA1.end(), hashSHA1.begin(), ::toupper);
if (0 != cHash.compare(hashSHA1)) {
removeRecordFile();
return FileCompleteOutcome(PdsError("Sha1CheckError", "Resumable Upload data SHA1 checksum fail."));
}
}
else {
removeRecordFile();
return FileCompleteOutcome(PdsError("CrcCheckError", "Resumable Upload data CRC checksum fail."));
}
}
}
removeRecordFile();
return completeOutcome;
}