def generate_chunk_buffers()

in http/get_compressed/python/server/server.py [0:0]


def generate_chunk_buffers(schema, source, compression):
    # the sink holds the buffer and we give a view of it to the caller
    with LateClosingBytesIO() as sink:
        # keep buffering until we have at least MIN_BUFFER_SIZE bytes
        # in the buffer before yielding it to the caller. Setting it
        # to 1 means we yield as soon as the compression blocks are
        # formed and reach the sink buffer.
        MIN_BUFFER_SIZE = 64 * 1024
        if compression.startswith("identity"):
            if compression == "identity+zstd":
                options = pa.ipc.IpcWriteOptions(compression="zstd")
            elif compression == "identity+lz4":
                options = pa.ipc.IpcWriteOptions(compression="lz4")
            else:
                options = None
            # source: RecordBatchReader
            #   |> writer: RecordBatchStreamWriter
            #   |> sink: LateClosingBytesIO
            writer = pa.ipc.new_stream(sink, schema, options=options)
            for batch in source:
                writer.write_batch(batch)
                if sink.tell() >= MIN_BUFFER_SIZE:
                    sink.truncate()
                    with sink.getbuffer() as buffer:
                        yield buffer
                    sink.seek(0)

            writer.close()  # write EOS marker and flush
        else:
            compression = "brotli" if compression == "br" else compression
            with pa.CompressedOutputStream(sink, compression) as compressor:
                # has the first buffer been yielded already?
                sent_first = False
                # source: RecordBatchReader
                #   |> writer: RecordBatchStreamWriter
                #   |> compressor: CompressedOutputStream
                #   |> sink: LateClosingBytesIO
                writer = pa.ipc.new_stream(compressor, schema)
                for batch in source:
                    writer.write_batch(batch)
                    # we try to yield a buffer ASAP no matter how small
                    if not sent_first and sink.tell() == 0:
                        compressor.flush()
                    pos = sink.tell()
                    if pos >= MIN_BUFFER_SIZE or (not sent_first and pos >= 1):
                        sink.truncate()
                        with sink.getbuffer() as buffer:
                            yield buffer
                        sink.seek(0)
                        sent_first = True

                writer.close()  # write EOS marker and flush
                compressor.close()

        sink.truncate()
        with sink.getbuffer() as buffer:
            yield buffer
        sink.close_now()