in http/get_compressed/python/server/server.py [0:0]
def generate_chunk_buffers(schema, source, compression):
# the sink holds the buffer and we give a view of it to the caller
with LateClosingBytesIO() as sink:
# keep buffering until we have at least MIN_BUFFER_SIZE bytes
# in the buffer before yielding it to the caller. Setting it
# to 1 means we yield as soon as the compression blocks are
# formed and reach the sink buffer.
MIN_BUFFER_SIZE = 64 * 1024
if compression.startswith("identity"):
if compression == "identity+zstd":
options = pa.ipc.IpcWriteOptions(compression="zstd")
elif compression == "identity+lz4":
options = pa.ipc.IpcWriteOptions(compression="lz4")
else:
options = None
# source: RecordBatchReader
# |> writer: RecordBatchStreamWriter
# |> sink: LateClosingBytesIO
writer = pa.ipc.new_stream(sink, schema, options=options)
for batch in source:
writer.write_batch(batch)
if sink.tell() >= MIN_BUFFER_SIZE:
sink.truncate()
with sink.getbuffer() as buffer:
yield buffer
sink.seek(0)
writer.close() # write EOS marker and flush
else:
compression = "brotli" if compression == "br" else compression
with pa.CompressedOutputStream(sink, compression) as compressor:
# has the first buffer been yielded already?
sent_first = False
# source: RecordBatchReader
# |> writer: RecordBatchStreamWriter
# |> compressor: CompressedOutputStream
# |> sink: LateClosingBytesIO
writer = pa.ipc.new_stream(compressor, schema)
for batch in source:
writer.write_batch(batch)
# we try to yield a buffer ASAP no matter how small
if not sent_first and sink.tell() == 0:
compressor.flush()
pos = sink.tell()
if pos >= MIN_BUFFER_SIZE or (not sent_first and pos >= 1):
sink.truncate()
with sink.getbuffer() as buffer:
yield buffer
sink.seek(0)
sent_first = True
writer.close() # write EOS marker and flush
compressor.close()
sink.truncate()
with sink.getbuffer() as buffer:
yield buffer
sink.close_now()