in storage/decorator.py [0:0]
def inflate(func: StorageDecoratorCallable[ProtocolStorageType]) -> StorageDecoratorCallable[ProtocolStorageType]:
"""
ProtocolStorage decorator for returning inflated content in case the original is gzipped
"""
def wrapper(
storage: ProtocolStorageType, range_start: int, body: BytesIO, is_gzipped: bool
) -> StorageDecoratorIterator:
iterator = func(storage, range_start, body, is_gzipped)
for data, _, _, _, _ in iterator:
if is_gzipped:
gzip_stream = gzip.GzipFile(fileobj=data) # type:ignore
gzip_stream.seek(range_start)
while True:
inflated_chunk: bytes = gzip_stream.read(CHUNK_SIZE)
if len(inflated_chunk) == 0:
break
buffer: BytesIO = BytesIO()
buffer.write(inflated_chunk)
shared_logger.debug("inflate inflate")
yield buffer.getvalue(), 0, 0, b"", None
else:
shared_logger.debug("inflate plain")
yield data, 0, 0, b"", None
return wrapper