in alibabacloud_oss_v2/copier.py [0:0]
def _multipart_copy(self) -> CopyResult:
# get tag prop if nesssessary
self._get_tag_props()
# init the multipart
self._init_upload()
# upload part
part_size = self._options.part_size
while self._total_size/part_size >= defaults.MAX_UPLOAD_PARTS:
part_size += self._options.part_size
self._options.part_size = part_size
parallel = self._options.parallel_num > 1
if parallel:
self._copy_part_lock = threading.Lock()
with concurrent.futures.ThreadPoolExecutor(self._options.parallel_num) as executor:
for result in executor.map(self._copy_part, self._iter_part()):
self._update_upload_result_lock(result)
else:
for part in self._iter_part():
self._update_upload_result_lock(self._copy_part(part))
if len(self._copy_errors) > 0:
break
# complete upload
cmresult: models.CompleteMultipartUploadResult = None
if len(self._copy_errors) == 0:
request = models.CompleteMultipartUploadRequest()
copy_request(request, self._request)
parts = sorted(self._copy_parts, key=lambda p: p.part_number)
request.upload_id = self._upload_id
request.complete_multipart_upload = models.CompleteMultipartUpload(parts=parts)
try:
cmresult = self._client.complete_multipart_upload(request)
except Exception as err:
self._copy_errors.append(err)
# check last error
if len(self._copy_errors) > 0:
if not self._options.leave_parts_on_error:
try:
abort_request = models.AbortMultipartUploadRequest()
copy_request(abort_request, self._request)
abort_request.upload_id = self._upload_id
self._client.abort_multipart_upload(abort_request)
except Exception as _:
pass
raise self._copy_errors[-1]
self._assert_crc_same(cmresult.headers)
ret = CopyResult(
upload_id=self._upload_id,
etag=cmresult.etag,
version_id=cmresult.version_id,
hash_crc64=cmresult.hash_crc64,
)
ret.status = cmresult.status
ret.status_code = cmresult.status_code
ret.request_id = cmresult.request_id
ret.headers = cmresult.headers
return ret