in sdk/src/resumable/ResumableCopier.cc [34:172]
CopyObjectOutcome ResumableCopier::Copy()
{
OssError err;
if (0 != validate(err)) {
return CopyObjectOutcome(err);
}
PartList partsToUploadCopy;
PartList partsCopied;
if (getPartsToUploadCopy(err, partsCopied, partsToUploadCopy) != 0) {
return CopyObjectOutcome(err);
}
std::vector<UploadPartCopyOutcome> outcomes;
std::vector<std::thread> threadPool;
for (uint32_t i = 0; i < request_.ThreadNum(); i++) {
threadPool.emplace_back(std::thread([&]() {
Part part;
while (true) {
{
std::lock_guard<std::mutex> lck(lock_);
if (partsToUploadCopy.empty())
break;
part = partsToUploadCopy.front();
partsToUploadCopy.erase(partsToUploadCopy.begin());
}
if (!client_->isEnableRequest())
break;
uint64_t offset = partSize_ * (part.PartNumber() - 1);
uint64_t length = part.Size();
auto uploadPartCopyReq = UploadPartCopyRequest(request_.Bucket(), request_.Key(), request_.SrcBucket(), request_.SrcKey(),
uploadID_, part.PartNumber(),
request_.SourceIfMatchEtag(), request_.SourceIfNotMatchEtag(),
request_.SourceIfModifiedSince(), request_.SourceIfUnModifiedSince());
uploadPartCopyReq.setCopySourceRange(offset, offset + length - 1);
if (request_.RequestPayer() == RequestPayer::Requester) {
uploadPartCopyReq.setRequestPayer(request_.RequestPayer());
}
if (request_.TrafficLimit() != 0) {
uploadPartCopyReq.setTrafficLimit(request_.TrafficLimit());
}
if (!request_.VersionId().empty()) {
uploadPartCopyReq.setVersionId(request_.VersionId());
}
auto outcome = client_->UploadPartCopy(uploadPartCopyReq);
#ifdef ENABLE_OSS_TEST
if (!!(request_.Flags() & 0x40000000) && (part.PartNumber() == 2 || part.PartNumber() == 4)) {
const char* TAG = "ResumableCopyObjectClient";
OSS_LOG(LogLevel::LogDebug, TAG, "NO.%d part data copy failed!", part.PartNumber());
outcome = UploadPartCopyOutcome();
}
#endif // ENABLE_OSS_TEST
//lock
{
std::lock_guard<std::mutex> lck(lock_);
if (outcome.isSuccess()) {
part.eTag_ = outcome.result().ETag();
partsCopied.push_back(part);
}
outcomes.push_back(outcome);
if (outcome.isSuccess()) {
auto process = request_.TransferProgress();
if (process.Handler) {
consumedSize_ += length;
process.Handler((size_t)length, consumedSize_, objectSize_, process.UserData);
}
}
}
}
}));
}
for (auto& worker : threadPool) {
if (worker.joinable()) {
worker.join();
}
}
for (const auto& outcome : outcomes) {
if (!outcome.isSuccess()) {
return CopyObjectOutcome(outcome.error());
}
}
if (!client_->isEnableRequest()) {
return CopyObjectOutcome(OssError("ClientError:100002", "Disable all requests by upper."));
}
// sort partsCopied
std::sort(partsCopied.begin(), partsCopied.end(), [](const Part& a, const Part& b)
{
return a.PartNumber() < b.PartNumber();
});
CompleteMultipartUploadRequest completeMultipartUploadReq(request_.Bucket(), request_.Key(), partsCopied, uploadID_);
if (request_.MetaData().HttpMetaData().find("x-oss-object-acl")
!= request_.MetaData().HttpMetaData().end()) {
std::string aclName = request_.MetaData().HttpMetaData().at("x-oss-object-acl");
completeMultipartUploadReq.setAcl(ToAclType(aclName.c_str()));
}
if (!request_.EncodingType().empty()) {
completeMultipartUploadReq.setEncodingType(request_.EncodingType());
}
if (request_.RequestPayer() == RequestPayer::Requester) {
completeMultipartUploadReq.setRequestPayer(request_.RequestPayer());
}
auto compOutcome = client_->CompleteMultipartUpload(completeMultipartUploadReq);
if (!compOutcome.isSuccess()) {
return CopyObjectOutcome(compOutcome.error());
}
removeRecordFile();
CopyObjectResult result;
HeadObjectRequest hRequest(request_.Bucket(), request_.Key());
if (request_.RequestPayer() == RequestPayer::Requester) {
hRequest.setRequestPayer(request_.RequestPayer());
}
if (!compOutcome.result().VersionId().empty()) {
hRequest.setVersionId(compOutcome.result().VersionId());
}
auto hOutcome = client_->HeadObject(HeadObjectRequest(hRequest));
if (hOutcome.isSuccess()) {
result.setLastModified(hOutcome.result().LastModified());
}
result.setEtag(compOutcome.result().ETag());
result.setRequestId(compOutcome.result().RequestId());
result.setVersionId(compOutcome.result().VersionId());
return CopyObjectOutcome(result);
}