in alibabacloud_oss_v2/encryption_client.py [0:0]
def _get_object_securely(self, request: models.GetObjectRequest, **kwargs
) -> models.GetObjectResult:
adjust_range_start = 0
discard_count = 0
erequest = request
if request.range_header is not None:
http_range = utils.parse_http_range(request.range_header)
adjust_range_start = _adjust_range_start(http_range[0])
discard_count = http_range[0] - adjust_range_start
if discard_count > 0:
erequest = copy.copy(request)
range_start = str(adjust_range_start) if adjust_range_start >= 0 else ''
range_end = str(http_range[1]) if http_range[1] >= 0 else ''
erequest.range_header = f'bytes={range_start}-{range_end}'
erequest.range_behavior = 'standard'
result = self._client.get_object(erequest, **kwargs)
try:
if _has_encrypted_header(result.headers):
envelope = _get_envelope_from_header(result.headers)
if not _is_valid_content_alg(envelope.cek_algorithm or ''):
raise exceptions.ParamInvalidError(field='envelope.cek_algorithm')
if not envelope.is_valid():
raise exceptions.ParamInvalidError(field='envelope')
cc = self._get_ccbuilder(envelope).content_cipher_from_env(envelope, offset=adjust_range_start)
result.body = cast(StreamBody, cc.decrypt_content(result.body))
except Exception as err:
if result.body is not None:
result.body.close()
raise err
if discard_count > 0:
#rewrite ContentRange & ContentLength
if result.content_range is not None:
crange = utils.parse_content_range(result.content_range)
value = f'bytes {crange[0] + discard_count}-{crange[1]}/{crange[2]}'
result.content_range = value
result.headers.update({"Content-Range": value})
else:
result.headers.update({"Content-Range": f'bytes {discard_count}-/*'})
if result.content_length is not None:
result.content_length -= discard_count
result.headers.update({"Content-Length": str(result.content_length)})
result.body = io_utils.StreamBodyDiscarder(result.body, discard_count)
return result