elastictransport/gzip.go (88 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package elastictransport
import (
"bytes"
"compress/gzip"
"fmt"
"io"
"sync"
)
type gzipCompressor interface {
// compress compresses the given io.ReadCloser and returns the gzip compressed data as a bytes.Buffer.
compress(io.ReadCloser) (*bytes.Buffer, error)
// collectBuffer collects the given bytes.Buffer for reuse.
collectBuffer(*bytes.Buffer)
}
// simpleGzipCompressor is a simple implementation of gzipCompressor that creates a new gzip.Writer for each call.
type simpleGzipCompressor struct {
compressionLevel int
}
func newSimpleGzipCompressor(compressionLevel int) gzipCompressor {
return &simpleGzipCompressor{
compressionLevel: compressionLevel,
}
}
func (sg *simpleGzipCompressor) compress(rc io.ReadCloser) (*bytes.Buffer, error) {
var buf bytes.Buffer
zw, err := gzip.NewWriterLevel(&buf, sg.compressionLevel)
if err != nil {
return nil, fmt.Errorf("failed setting up up compress request body (level %d): %s",
sg.compressionLevel, err)
}
if _, err = io.Copy(zw, rc); err != nil {
return nil, fmt.Errorf("failed to compress request body: %s", err)
}
if err := zw.Close(); err != nil {
return nil, fmt.Errorf("failed to compress request body (during close): %s", err)
}
return &buf, nil
}
func (sg *simpleGzipCompressor) collectBuffer(buf *bytes.Buffer) {
// no-op
}
type pooledGzipCompressor struct {
gzipWriterPool *sync.Pool
bufferPool *sync.Pool
compressionLevel int
}
type gzipWriter struct {
writer *gzip.Writer
err error
}
// newPooledGzipCompressor returns a new pooledGzipCompressor that uses a sync.Pool to reuse gzip.Writers.
func newPooledGzipCompressor(compressionLevel int) gzipCompressor {
gzipWriterPool := sync.Pool{
New: func() any {
writer, err := gzip.NewWriterLevel(io.Discard, compressionLevel)
return &gzipWriter{
writer: writer,
err: err,
}
},
}
bufferPool := sync.Pool{
New: func() any {
return new(bytes.Buffer)
},
}
return &pooledGzipCompressor{
gzipWriterPool: &gzipWriterPool,
bufferPool: &bufferPool,
compressionLevel: compressionLevel,
}
}
func (pg *pooledGzipCompressor) compress(rc io.ReadCloser) (*bytes.Buffer, error) {
writer := pg.gzipWriterPool.Get().(*gzipWriter)
defer pg.gzipWriterPool.Put(writer)
if writer.err != nil {
return nil, fmt.Errorf("failed setting up up compress request body (level %d): %s",
pg.compressionLevel, writer.err)
}
buf := pg.bufferPool.Get().(*bytes.Buffer)
buf.Reset()
writer.writer.Reset(buf)
if _, err := io.Copy(writer.writer, rc); err != nil {
return nil, fmt.Errorf("failed to compress request body: %s", err)
}
if err := writer.writer.Close(); err != nil {
return nil, fmt.Errorf("failed to compress request body (during close): %s", err)
}
return buf, nil
}
func (pg *pooledGzipCompressor) collectBuffer(buf *bytes.Buffer) {
pg.bufferPool.Put(buf)
}