in alibabacloud_oss_v2/client.py [0:0]
def get_object_to_file(self, request: models.GetObjectRequest, filepath: str, **kwargs) -> models.GetObjectResult:
"""get an object to file
Args:
request (GetObjectRequest): Request parameters for GetObject operation.
filepath (str): The path of the file to download.
Returns:
GetObjectResult: Response result for GetObject operation.
"""
prog = None
if request.progress_fn:
prog = Progress(request.progress_fn, -1)
chash = None
if self._client.has_feature(FF_ENABLE_CRC64_CHECK_DOWNLOAD):
chash = Crc64(0)
def _crc_checker(headers):
if chash is None:
return
scrc = headers.get('x-oss-hash-crc64ecma', None)
if scrc is None:
return
ccrc = str(chash.sum64())
if scrc != ccrc:
raise exceptions.InconsistentError(
client_crc=ccrc,
server_crc=scrc
)
def _get_object_to_file_no_retry(client: Client, request: models.GetObjectRequest, filepath: str, **kwargs):
with open(filepath, 'wb') as f:
err = None
result = client.get_object(request, **kwargs)
if prog:
prog._total = result.content_length
try:
for d in result.body.iter_bytes():
f.write(d)
if prog:
prog.write(d)
if chash:
chash.write(d)
_crc_checker(result.headers)
except Exception as e:
err = e
result.body.close()
return result, err
result = None
err = None
for _ in range(1, self._client.get_retry_attempts()):
result, err = _get_object_to_file_no_retry(self, request, filepath, **kwargs)
if err is None:
break
if prog:
prog.reset()
if chash:
chash.reset()
if err is not None:
raise err
return result