in sdk/src/resumable/ResumableDownloader.cc [31:274]
DataGetOutcome ResumableDownloader::Download()
{
PdsError err;
if (0 != validate(err)) {
return DataGetOutcome(err);
}
PartRecordList partsToDownload;
if (getPartsToDownload(err, partsToDownload) != 0) {
return DataGetOutcome(err);
}
if (url_.empty()) {
FileGetDownloadUrlRequest getDownloadUrlRequest(record_.driveID, record_.shareID, record_.fileID);
getDownloadUrlRequest.setShareToken(request_.ShareToken());
auto getDownloadUrlOutcome = FileGetDownloadUrlWrap(getDownloadUrlRequest);
if (!getDownloadUrlOutcome.isSuccess()) {
return DataGetOutcome(getDownloadUrlOutcome.error());
}
url_ = getDownloadUrlOutcome.result().Url();
}
//task queue
PartRecordList downloadedParts;
if (hasRecord_) {
downloadedParts = record_.parts;
}
std::vector<DataGetOutcome> outcomes;
std::vector<std::thread> threadPool;
for (uint32_t i = 0; i < request_.ThreadNum(); i++) {
threadPool.emplace_back(std::thread([&]() {
PartRecord part;
while (true) {
{
std::lock_guard<std::mutex> lck(lock_);
if (partsToDownload.empty())
break;
part = partsToDownload.front();
partsToDownload.erase(partsToDownload.begin());
}
if (!client_->isEnableRequest())
break;
// check resumable progress control
if (DownloadPartProcessControlCallback((void *)this) != 0)
break;
uint64_t pos = partSize_ * (part.partNumber - 1);
uint64_t start = part.offset;
uint64_t end = start + part.size - 1;
auto getDataReq = DataGetByUrlRequest(url_);
getDataReq.setResponseStreamFactory([=]() {
auto tmpFstream = GetFstreamByPath(request_.TempFilePath(), request_.TempFilePathW(),
std::ios_base::in | std::ios_base::out | std::ios_base::binary);
tmpFstream->seekp(pos, tmpFstream->beg);
return tmpFstream;
});
getDataReq.setRange(start, end);
if (!crc64Hash_.empty()) {
getDataReq.setFlags(getDataReq.Flags() | REQUEST_FLAG_CHECK_CRC64 | REQUEST_FLAG_SAVE_CLIENT_CRC64);
}
auto process = request_.TransferProgress();
if (process.Handler) {
TransferProgress downloadPartProcess = { DownloadPartProcessCallback, (void *)this };
getDataReq.setTransferProgress(downloadPartProcess);
}
auto progressControl = request_.ProgressControl();
if (progressControl.Handler) {
ProgressControl downloadPartProgressControl = { DownloadPartProcessControlCallback, (void *)this };
getDataReq.setProgressControl(downloadPartProgressControl);
}
if (request_.TrafficLimit() != 0) {
getDataReq.setTrafficLimit(request_.TrafficLimit());
}
auto outcome = DataGetByUrlWrap(getDataReq);
// lock
bool needRetry = false;
{
std::lock_guard<std::mutex> lck(lock_);
if (!outcome.isSuccess() && outcome.error().Code() == "AccessDenied" && outcome.error().Message().find("expired")) {
FileGetDownloadUrlRequest getDownloadUrlRequest(record_.driveID, record_.shareID ,record_.fileID);
getDownloadUrlRequest.setShareToken(request_.ShareToken());
auto getDownloadUrlOutcome = FileGetDownloadUrlWrap(getDownloadUrlRequest);
if (!getDownloadUrlOutcome.isSuccess()) {
outcomes.push_back(getDownloadUrlOutcome.error());
break;
}
// check file content-hash
auto contentHash = getDownloadUrlOutcome.result().ContentHash();
auto fileSize = getDownloadUrlOutcome.result().Size();
if (contentHash != contentHash_ || fileSize != (int64_t)fileSize_) {
outcomes.push_back(PdsError("SourceFileModified","Source file has been modified since last download."));
break;
}
url_ = getDownloadUrlOutcome.result().Url();
needRetry = true;
}
}
if (needRetry){
getDataReq.setUrl(url_);
getDataReq.setResponseStreamFactory([=]() {
auto tmpFstream = GetFstreamByPath(request_.TempFilePath(), request_.TempFilePathW(),
std::ios_base::in | std::ios_base::out | std::ios_base::binary);
tmpFstream->seekp(pos, tmpFstream->beg);
return tmpFstream;
});
outcome = DataGetByUrlWrap(getDataReq);
}
#ifdef ENABLE_PDS_TEST
if (!!(request_.Flags() & 0x40000000) && part.partNumber == 2) {
const char* TAG = "ResumableDownloadClient";
PDS_LOG(LogLevel::LogDebug, TAG, "NO.2 part data download failed.");
outcome = DataGetOutcome();
}
#endif // ENABLE_PDS_TEST
// lock
{
std::lock_guard<std::mutex> lck(lock_);
if (outcome.isSuccess()) {
if (!crc64Hash_.empty()) {
part.crc64 = std::strtoull(outcome.result().Metadata().HttpMetaData().at("x-oss-hash-crc64ecma-by-client").c_str(), nullptr, 10);
}
downloadedParts.push_back(part);
}
outcomes.push_back(outcome);
//update record
if (hasRecordPath() && outcome.isSuccess()) {
auto &record = record_;
record.parts = downloadedParts;
Json::Value root;
root["opType"] = record.opType;
root["driveID"] = record.driveID;
root["shareID"] = record.shareID;
root["fileID"] = record.fileID;
root["contentHash"] = record.contentHash;
root["filePath"] = record.filePath;
root["mtime"] = record.mtime;
root["size"] = record.size;
root["partSize"] = record.partSize;
int index = 0;
for (PartRecord& partR : record.parts) {
root["parts"][index]["partNumber"] = partR.partNumber;
root["parts"][index]["size"] = partR.size;
root["parts"][index]["crc64"] = partR.crc64;
index++;
}
std::stringstream ss;
ss << root;
std::string md5Sum = ComputeContentETag(ss);
root["md5Sum"] = md5Sum;
auto recordStream = GetFstreamByPath(recordPath_, recordPathW_, std::ios::out);
if (recordStream->is_open()) {
*recordStream << root;
recordStream->close();
}
}
}
}
}));
}
for (auto& worker : threadPool) {
if (worker.joinable()) {
worker.join();
}
}
if (!client_->isEnableRequest()) {
return DataGetOutcome(PdsError("ClientError:100002", "Disable all requests by upper."));
}
int32_t controlFlag = DownloadPartProcessControlCallback((void *)this);
if (controlFlag == ProgressControlStop) {
return DataGetOutcome(PdsError("ClientError:100003", "Download stop by upper."));
}
if (controlFlag == ProgressControlCancel) {
removeRecordFile();
removeTempFile();
return DataGetOutcome(PdsError("ClientError:100004", "Download cancel by upper."));
}
std::shared_ptr<std::iostream> content = nullptr;
for (auto& outcome : outcomes) {
if (!outcome.isSuccess()) {
return DataGetOutcome(outcome.error());
}
outcome.result().setContent(content);
}
if (downloadedParts.size() < outcomes.size()) {
return DataGetOutcome(PdsError("DownloadNotComplete", "Not all parts are downloaded."));
}
std::sort(downloadedParts.begin(), downloadedParts.end(), [&](const PartRecord& a, const PartRecord& b)
{
return a.partNumber < b.partNumber;
});
// check size
if (client_->configuration().enableCheckDownloadFileSize) {
uint64_t localFileSize = GetFileSize(request_.TempFilePath(), request_.TempFilePathW());
if (fileSize_ != localFileSize) {
return DataGetOutcome(PdsError("FileSizeCheckError", "Resumable Download data check size fail."));
}
}
// check crc64
if (client_->configuration().enableCrc64 && !crc64Hash_.empty()) {
uint64_t localCRC64 = downloadedParts[0].crc64;
for (size_t i = 1; i < downloadedParts.size(); i++) {
localCRC64 = CRC64::CombineCRC(localCRC64, downloadedParts[i].crc64, downloadedParts[i].size);
}
uint64_t serverCRC64 = std::strtoull(crc64Hash_.c_str(), nullptr, 10);
if (localCRC64 != serverCRC64) {
removeRecordFile();
return DataGetOutcome(PdsError("CrcCheckError", "Resumable Download data CRC checksum fail."));
}
}
if (!renameTempFile()) {
std::stringstream ss;
ss << "rename temp file failed";
return DataGetOutcome(PdsError("RenameError", ss.str()));
}
removeRecordFile();
DataGetResult result;
return DataGetOutcome(result);
}