in sdk/src/resumable/ResumableDownloader.cc [37:248]
GetObjectOutcome ResumableDownloader::Download()
{
OssError err;
if (0 != validate(err)) {
return GetObjectOutcome(err);
}
PartRecordList partsToDownload;
if (getPartsToDownload(err, partsToDownload) != 0) {
return GetObjectOutcome(err);
}
//task queue
PartRecordList downloadedParts;
if (hasRecord_) {
downloadedParts = record_.parts;
}
std::vector<GetObjectOutcome> outcomes;
std::vector<std::thread> threadPool;
for (uint32_t i = 0; i < request_.ThreadNum(); i++) {
threadPool.emplace_back(std::thread([&]() {
PartRecord part;
while (true) {
{
std::lock_guard<std::mutex> lck(lock_);
if (partsToDownload.empty())
break;
part = partsToDownload.front();
partsToDownload.erase(partsToDownload.begin());
}
if (!client_->isEnableRequest())
break;
uint64_t pos = partSize_ * (part.partNumber - 1);
uint64_t start = part.offset;
uint64_t end = start + part.size - 1;
auto getObjectReq = GetObjectRequest(request_.Bucket(), request_.Key(), request_.ModifiedSinceConstraint(), request_.UnmodifiedSinceConstraint(),
request_.MatchingETagsConstraint(), request_.NonmatchingETagsConstraint(), request_.ResponseHeaderParameters());
getObjectReq.setResponseStreamFactory([=]() {
auto tmpFstream = GetFstreamByPath(request_.TempFilePath(), request_.TempFilePathW(),
std::ios_base::in | std::ios_base::out | std::ios_base::binary);
tmpFstream->seekp(pos, tmpFstream->beg);
return tmpFstream;
});
getObjectReq.setRange(start, end);
getObjectReq.setFlags(getObjectReq.Flags() | REQUEST_FLAG_CHECK_CRC64 | REQUEST_FLAG_SAVE_CLIENT_CRC64);
DownloaderTransferState transferState;
auto process = request_.TransferProgress();
if (process.Handler) {
transferState.transfered = 0;
transferState.userData = (void *)this;
TransferProgress uploadPartProcess = { DownloadPartProcessCallback, (void *)&transferState };
getObjectReq.setTransferProgress(uploadPartProcess);
}
if (request_.RequestPayer() == RequestPayer::Requester) {
getObjectReq.setRequestPayer(request_.RequestPayer());
}
if (request_.TrafficLimit() != 0) {
getObjectReq.setTrafficLimit(request_.TrafficLimit());
}
if (!request_.VersionId().empty()) {
getObjectReq.setVersionId(request_.VersionId());
}
auto outcome = GetObjectWrap(getObjectReq);
#ifdef ENABLE_OSS_TEST
if (!!(request_.Flags() & 0x40000000) && part.partNumber == 2) {
const char* TAG = "ResumableDownloadObjectClient";
OSS_LOG(LogLevel::LogDebug, TAG, "NO.2 part data download failed.");
outcome = GetObjectOutcome();
}
#endif // ENABLE_OSS_TEST
// lock
{
std::lock_guard<std::mutex> lck(lock_);
if (outcome.isSuccess()) {
part.crc64 = std::strtoull(outcome.result().Metadata().HttpMetaData().at("x-oss-hash-crc64ecma-by-client").c_str(), nullptr, 10);
downloadedParts.push_back(part);
}
outcomes.push_back(outcome);
//update record
if (hasRecordPath() && outcome.isSuccess()) {
auto &record = record_;
record.parts = downloadedParts;
Json::Value root;
root["opType"] = record.opType;
root["bucket"] = record.bucket;
root["key"] = record.key;
root["filePath"] = record.filePath;
root["mtime"] = record.mtime;
root["size"] = record.size;
root["partSize"] = record.partSize;
int index = 0;
for (PartRecord& partR : record.parts) {
root["parts"][index]["partNumber"] = partR.partNumber;
root["parts"][index]["size"] = partR.size;
root["parts"][index]["crc64"] = partR.crc64;
index++;
}
std::stringstream ss;
ss << root;
std::string md5Sum = ComputeContentETag(ss);
root["md5Sum"] = md5Sum;
if (request_.RangeIsSet()) {
root["rangeStart"] = record.rangeStart;
root["rangeEnd"] = record.rangeEnd;
}
auto recordStream = GetFstreamByPath(recordPath_, recordPathW_, std::ios::out);
if (recordStream->is_open()) {
*recordStream << root;
recordStream->close();
}
}
}
}
}));
}
for (auto& worker : threadPool) {
if (worker.joinable()) {
worker.join();
}
}
std::shared_ptr<std::iostream> content = nullptr;
for (auto& outcome : outcomes) {
if (!outcome.isSuccess()) {
return GetObjectOutcome(outcome.error());
}
outcome.result().setContent(content);
}
if (!client_->isEnableRequest()) {
return GetObjectOutcome(OssError("ClientError:100002", "Disable all requests by upper."));
}
// sort
std::sort(downloadedParts.begin(), downloadedParts.end(), [&](const PartRecord& a, const PartRecord& b)
{
return a.partNumber < b.partNumber;
});
ObjectMetaData meta;
if (outcomes.empty()) {
HeadObjectRequest hRequest(request_.Bucket(), request_.Key());
if (request_.RequestPayer() == RequestPayer::Requester) {
hRequest.setRequestPayer(request_.RequestPayer());
}
if (!request_.VersionId().empty()) {
hRequest.setVersionId(request_.VersionId());
}
auto hOutcome = client_->HeadObject(hRequest);
if (!hOutcome.isSuccess()) {
return GetObjectOutcome(hOutcome.error());
}
meta = hOutcome.result();
}
else {
meta = outcomes[0].result().Metadata();
}
meta.setContentLength(contentLength_);
//check crc and update metadata
if (!request_.RangeIsSet()) {
if (client_->configuration().enableCrc64) {
uint64_t localCRC64 = downloadedParts[0].crc64;
for (size_t i = 1; i < downloadedParts.size(); i++) {
localCRC64 = CRC64::CombineCRC(localCRC64, downloadedParts[i].crc64, downloadedParts[i].size);
}
if (localCRC64 != meta.CRC64()) {
return GetObjectOutcome(OssError("CrcCheckError", "ResumableDownload object CRC checksum fail."));
}
}
meta.HttpMetaData().erase(Http::CONTENT_RANGE);
}
else {
std::stringstream ss;
ss << "bytes " << std::to_string(request_.RangeStart()) << "-";
if (request_.RangeEnd() != -1) {
ss << std::to_string(request_.RangeEnd()) << "/" << std::to_string(objectSize_);
}
else {
ss << std::to_string(objectSize_ - 1) << "/" << std::to_string(objectSize_);
}
meta.HttpMetaData()["Content-Range"] = ss.str();
}
if (meta.HttpMetaData().find("x-oss-hash-crc64ecma-by-client") != meta.HttpMetaData().end()) {
meta.HttpMetaData().erase("x-oss-hash-crc64ecma-by-client");
}
if (!renameTempFile()) {
std::stringstream ss;
ss << "rename temp file failed";
return GetObjectOutcome(OssError("RenameError", ss.str()));
}
removeRecordFile();
GetObjectResult result(request_.Bucket(), request_.Key(), meta);
return GetObjectOutcome(result);
}