in arrow/ipc/writer.go [188:261]
func (w *recordEncoder) compressBodyBuffers(p *Payload) error {
compress := func(idx int, codec compressor) error {
if p.body[idx] == nil || p.body[idx].Len() == 0 {
return nil
}
var buf bytes.Buffer
buf.Grow(codec.MaxCompressedLen(p.body[idx].Len()) + arrow.Int64SizeBytes)
if err := binary.Write(&buf, binary.LittleEndian, uint64(p.body[idx].Len())); err != nil {
return err
}
codec.Reset(&buf)
if _, err := codec.Write(p.body[idx].Bytes()); err != nil {
return err
}
if err := codec.Close(); err != nil {
return err
}
p.body[idx] = memory.NewBufferBytes(buf.Bytes())
return nil
}
if w.compressNP <= 1 {
codec := getCompressor(w.codec)
for idx := range p.body {
if err := compress(idx, codec); err != nil {
return err
}
}
return nil
}
var (
wg sync.WaitGroup
ch = make(chan int)
errch = make(chan error)
ctx, cancel = context.WithCancel(context.Background())
)
defer cancel()
for i := 0; i < w.compressNP; i++ {
go func() {
defer wg.Done()
codec := getCompressor(w.codec)
for {
select {
case idx, ok := <-ch:
if !ok {
// we're done, channel is closed!
return
}
if err := compress(idx, codec); err != nil {
errch <- err
cancel()
return
}
case <-ctx.Done():
// cancelled, return early
return
}
}
}()
}
for idx := range p.body {
ch <- idx
}
close(ch)
wg.Wait()
close(errch)
return <-errch
}