in sdk/src/resumable/ResumableUploader.cc [66:217]
PutObjectOutcome ResumableUploader::Upload()
{
OssError err;
if (0 != validate(err)) {
return PutObjectOutcome(err);
}
PartList partsToUpload;
PartList uploadedParts;
if (getPartsToUpload(err, uploadedParts, partsToUpload) != 0){
return PutObjectOutcome(err);
}
std::vector<PutObjectOutcome> outcomes;
std::vector<std::thread> threadPool;
for (uint32_t i = 0; i < request_.ThreadNum(); i++) {
threadPool.emplace_back(std::thread([&]() {
Part part;
while (true) {
{
std::lock_guard<std::mutex> lck(lock_);
if (partsToUpload.empty())
break;
part = partsToUpload.front();
partsToUpload.erase(partsToUpload.begin());
}
if (!client_->isEnableRequest())
break;
uint64_t offset = partSize_ * (part.PartNumber() - 1);
uint64_t length = part.Size();
auto content = GetFstreamByPath(request_.FilePath(), request_.FilePathW(),
std::ios::in | std::ios::binary);
content->seekg(offset, content->beg);
UploadPartRequest uploadPartRequest(request_.Bucket(), request_.Key(), part.PartNumber(), uploadID_, content);
uploadPartRequest.setContentLength(length);
UploaderTransferState transferState;
auto process = request_.TransferProgress();
if (process.Handler) {
transferState.transfered = 0;
transferState.userData = (void *)this;
TransferProgress uploadPartProcess = { UploadPartProcessCallback, (void *)&transferState };
uploadPartRequest.setTransferProgress(uploadPartProcess);
}
if (request_.RequestPayer() == RequestPayer::Requester) {
uploadPartRequest.setRequestPayer(request_.RequestPayer());
}
if (request_.TrafficLimit() != 0) {
uploadPartRequest.setTrafficLimit(request_.TrafficLimit());
}
auto outcome = UploadPartWrap(uploadPartRequest);
#ifdef ENABLE_OSS_TEST
if (!!(request_.Flags() & 0x40000000) && part.PartNumber() == 2) {
const char* TAG = "ResumableUploadObjectClient";
OSS_LOG(LogLevel::LogDebug, TAG, "NO.2 part data upload failed.");
outcome = PutObjectOutcome();
}
#endif // ENABLE_OSS_TEST
if (outcome.isSuccess()) {
part.eTag_ = outcome.result().ETag();
part.cRC64_ = outcome.result().CRC64();
}
//lock
{
std::lock_guard<std::mutex> lck(lock_);
uploadedParts.push_back(part);
outcomes.push_back(outcome);
}
}
}));
}
for (auto& worker:threadPool) {
if(worker.joinable()){
worker.join();
}
}
if (!client_->isEnableRequest()) {
return PutObjectOutcome(OssError("ClientError:100002", "Disable all requests by upper."));
}
for (const auto& outcome : outcomes) {
if (!outcome.isSuccess()) {
return PutObjectOutcome(outcome.error());
}
}
// sort uploadedParts
std::sort(uploadedParts.begin(), uploadedParts.end(), [&](const Part& a, const Part& b)
{
return a.PartNumber() < b.PartNumber();
});
CompleteMultipartUploadRequest completeMultipartUploadReq(request_.Bucket(), request_.Key(), uploadedParts, uploadID_);
if (request_.MetaData().hasHeader("x-oss-object-acl")) {
completeMultipartUploadReq.MetaData().HttpMetaData()["x-oss-object-acl"] =
request_.MetaData().HttpMetaData().at("x-oss-object-acl");
}
if (!request_.EncodingType().empty()) {
completeMultipartUploadReq.setEncodingType(request_.EncodingType());
}
if (request_.MetaData().hasHeader("x-oss-callback")) {
completeMultipartUploadReq.MetaData().HttpMetaData()["x-oss-callback"] =
request_.MetaData().HttpMetaData().at("x-oss-callback");
if (request_.MetaData().hasHeader("x-oss-callback-var")) {
completeMultipartUploadReq.MetaData().HttpMetaData()["x-oss-callback-var"] =
request_.MetaData().HttpMetaData().at("x-oss-callback-var");
}
if (request_.MetaData().hasHeader("x-oss-pub-key-url")) {
completeMultipartUploadReq.MetaData().HttpMetaData()["x-oss-pub-key-url"] =
request_.MetaData().HttpMetaData().at("x-oss-pub-key-url");
}
}
if (request_.RequestPayer() == RequestPayer::Requester) {
completeMultipartUploadReq.setRequestPayer(request_.RequestPayer());
}
auto outcome = CompleteMultipartUploadWrap(completeMultipartUploadReq);
if (!outcome.isSuccess()) {
return PutObjectOutcome(outcome.error());
}
// crc
uint64_t localCRC64 = uploadedParts[0].CRC64();
for (size_t i = 1; i < uploadedParts.size(); i++) {
localCRC64 = CRC64::CombineCRC(localCRC64, uploadedParts[i].CRC64(), uploadedParts[i].Size());
}
uint64_t ossCRC64 = outcome.result().CRC64();
if (ossCRC64 != 0 && localCRC64 != ossCRC64) {
return PutObjectOutcome(OssError("CrcCheckError", "ResumableUpload Object CRC Checksum fail."));
}
removeRecordFile();
HeaderCollection headers;
headers[Http::ETAG] = outcome.result().ETag();
headers["x-oss-hash-crc64ecma"] = std::to_string(outcome.result().CRC64());
headers["x-oss-request-id"] = outcome.result().RequestId();
if (!outcome.result().VersionId().empty()) {
headers["x-oss-version-id"] = outcome.result().VersionId();
}
return PutObjectOutcome(PutObjectResult(headers, outcome.result().Content()));
}