in ptest/src/Program.cc [264:368]
static taskResult upload_multipart(const OssClient &client, const std::string &key, const std::string &fileToUpload)
{
taskResult result;
result.transferSize = get_file_size(fileToUpload);
result.startTimePoint = std::chrono::system_clock::now();
// Set the part size
const int partSize = Config::PartSize;
const int64_t fileLength = result.transferSize;
auto partCount = calculate_part_count(fileLength, partSize);
std::vector< SubTaskInfo> SubTaskInfos;
for (auto i = 0; i < partCount; i++) {
SubTaskInfo subinfo;
subinfo.partNum = i + 1;
subinfo.offset = partSize;
subinfo.offset *= i;
SubTaskInfos.push_back(subinfo);
}
std::vector<PutObjectOutcomeCallable> outcomes;
bool failed = false;
std::string code;
std::string message;
InitiateMultipartUploadRequest imuRequest(Config::BucketName, key);
auto initOutcome = client.InitiateMultipartUpload(imuRequest);
if (initOutcome.isSuccess()) {
while (!SubTaskInfos.empty() && !failed) {
if (outcomes.size() < static_cast<size_t>(Config::Parallel)) {
SubTaskInfo subinfo = *SubTaskInfos.begin();
SubTaskInfos.erase(SubTaskInfos.begin());
std::shared_ptr<std::iostream> content = std::make_shared<std::fstream>(fileToUpload, std::ios::in | std::ios::binary);
content->seekg(subinfo.offset);
UploadPartRequest request(Config::BucketName, key, content);
request.setPartNumber(subinfo.partNum);
request.setUploadId(initOutcome.result().UploadId());
request.setContentLength(Config::PartSize);
auto outcomeCallable = client.UploadPartCallable(request);
outcomes.emplace_back(std::move(outcomeCallable));
}
//check all outcomes
for (auto it = outcomes.begin(); !failed && it != outcomes.end();) {
auto status = it->wait_for(std::chrono::milliseconds(100));
if (status == std::future_status::ready) {
auto outcome = it->get();
if (!outcome.isSuccess()) {
failed = true;
code = outcome.error().Code();
message = outcome.error().Message();
}
it = outcomes.erase(it);
}
else {
it++;
}
}
}
for (auto it = outcomes.begin(); !failed && it != outcomes.end(); it++) {
auto outcome = it->get();
if (!outcome.isSuccess()) {
failed = true;
code = outcome.error().Code();
message = outcome.error().Message();
}
}
if (!failed) {
auto listOutcome = client.ListParts(ListPartsRequest(Config::BucketName, key, initOutcome.result().UploadId()));
if (listOutcome.isSuccess()) {
auto cOutcome = client.CompleteMultipartUpload(
CompleteMultipartUploadRequest(Config::BucketName, key,
listOutcome.result().PartList(),
initOutcome.result().UploadId()));
if (!cOutcome.isSuccess()) {
failed = true;
code = listOutcome.error().Code();
message = listOutcome.error().Message();
}
else {
if (cOutcome.result().CRC64() != uploadFileCRC64) {
failed = true;
code = "CRC64 check fail.";
message = "CRC64 check fail.";
}
}
}
else {
failed = true;
code = listOutcome.error().Code();
message = listOutcome.error().Message();
}
}
}
else {
failed = true;
code = initOutcome.error().Code();
message = initOutcome.error().Message();
}
result.stopTimePoint = std::chrono::system_clock::now();
result.success = !failed;
return result;
}