in alibabacloud_oss_v2/uploader.py [0:0]
def _multipart_part(self) -> UploadResult:
# init the multipart
try:
upload_ctx = self._get_upload_context()
except Exception as err:
raise self._wrap_error('', err)
# update checkpoint
if self._checkpoint:
self._checkpoint.upload_id = upload_ctx.upload_id
self._checkpoint.dump()
# upload part
parallel = self._options.parallel_num > 1
if parallel:
self._upload_part_lock = threading.Lock()
with concurrent.futures.ThreadPoolExecutor(self._options.parallel_num) as executor:
for result in executor.map(self._upload_part, self._iter_part(upload_ctx)):
self._update_upload_result(result)
else:
for part in self._iter_part(upload_ctx):
self._update_upload_result(self._upload_part(part))
if len(self._upload_errors) > 0:
break
# complete upload
cmresult: models.CompleteMultipartUploadResult = None
if len(self._upload_errors) == 0:
request = models.CompleteMultipartUploadRequest()
copy_request(request, self._reqeust)
parts = sorted(self._uploaded_parts, key=lambda p: p.part_number)
request.upload_id = upload_ctx.upload_id
request.complete_multipart_upload = models.CompleteMultipartUpload(parts=parts)
try:
cmresult = self._client.complete_multipart_upload(request)
except Exception as err:
self._upload_errors.append(err)
# check last error
if len(self._upload_errors) > 0:
if not self._options.leave_parts_on_error:
try:
abort_request = models.AbortMultipartUploadRequest()
abort_request.upload_id = upload_ctx.upload_id
copy_request(request, self._reqeust)
self._client.abort_multipart_upload(abort_request)
except Exception as _:
pass
raise self._wrap_error(upload_ctx.upload_id, self._upload_errors[-1])
self._assert_crc_same(cmresult.headers)
ret = UploadResult(
upload_id=upload_ctx.upload_id,
etag=cmresult.etag,
version_id=cmresult.version_id,
hash_crc64=cmresult.hash_crc64,
)
ret.status = cmresult.status
ret.status_code = cmresult.status_code
ret.request_id = cmresult.request_id
ret.headers = cmresult.headers
return ret