func()

in arrow/ipc/writer.go [188:261]


func (w *recordEncoder) compressBodyBuffers(p *Payload) error {
	compress := func(idx int, codec compressor) error {
		if p.body[idx] == nil || p.body[idx].Len() == 0 {
			return nil
		}
		var buf bytes.Buffer
		buf.Grow(codec.MaxCompressedLen(p.body[idx].Len()) + arrow.Int64SizeBytes)
		if err := binary.Write(&buf, binary.LittleEndian, uint64(p.body[idx].Len())); err != nil {
			return err
		}
		codec.Reset(&buf)
		if _, err := codec.Write(p.body[idx].Bytes()); err != nil {
			return err
		}
		if err := codec.Close(); err != nil {
			return err
		}
		p.body[idx] = memory.NewBufferBytes(buf.Bytes())
		return nil
	}

	if w.compressNP <= 1 {
		codec := getCompressor(w.codec)
		for idx := range p.body {
			if err := compress(idx, codec); err != nil {
				return err
			}
		}
		return nil
	}

	var (
		wg          sync.WaitGroup
		ch          = make(chan int)
		errch       = make(chan error)
		ctx, cancel = context.WithCancel(context.Background())
	)
	defer cancel()

	for i := 0; i < w.compressNP; i++ {
		go func() {
			defer wg.Done()
			codec := getCompressor(w.codec)
			for {
				select {
				case idx, ok := <-ch:
					if !ok {
						// we're done, channel is closed!
						return
					}

					if err := compress(idx, codec); err != nil {
						errch <- err
						cancel()
						return
					}
				case <-ctx.Done():
					// cancelled, return early
					return
				}
			}
		}()
	}

	for idx := range p.body {
		ch <- idx
	}

	close(ch)
	wg.Wait()
	close(errch)

	return <-errch
}