azkustoingest/internal/gzip/gzip.go (64 lines of code) (raw):

// Package gzip provides a streaming object for taking in io.ReadCloser that is being written to // and providing an io.ReadCloser that outputs the original content gzip compressed. package gzip import ( "compress/gzip" "io" "sync" "sync/atomic" ) var compressPool = &sync.Pool{ New: func() interface{} { return gzip.NewWriter(nil) }, } // Streamer implements an io.ReadCloser that converts data from a non-compressed stream to a compressed stream. type Streamer struct { userInput io.ReadCloser outputRead *io.PipeReader outputWrite *io.PipeWriter size int64 err atomic.Value // holds error } // New creates a new streamer object. Use Reset() to initialize it. func New() *Streamer { return &Streamer{} } // Reset resets the streamer object to defaults and accepts the io.ReadCloser. // You can only use Reset after a previous reader has closed. func (s *Streamer) Reset(reader io.ReadCloser) { s.userInput = reader s.outputRead, s.outputWrite = io.Pipe() s.size = 0 s.err = atomic.Value{} s.run() } // InputSize returns the amount of data that the Streamer streamed. This will only be accurate for // the full stream after Read() has returned io.EOF and not before. func (s *Streamer) InputSize() int64 { return s.size } func Compress(payload io.Reader) io.Reader { var closer io.ReadCloser var ok bool if closer, ok = payload.(io.ReadCloser); !ok { closer = io.NopCloser(payload) } zw := New() zw.Reset(closer) return zw } // run copies the file into a buffer that we stream back via our Read() call. func (s *Streamer) run() { zw := compressPool.Get().(*gzip.Writer) zw.Reset(s.outputWrite) go func() { defer compressPool.Put(zw) defer s.outputWrite.Close() defer zw.Close() defer zw.Flush() amount, err := io.Copy(zw, s.userInput) s.size = int64(amount) if err != nil { s.err.Store(err) } }() } // Read implements io.Reader. func (s *Streamer) Read(b []byte) (int, error) { amount, err := s.outputRead.Read(b) return amount, err } // Close implements io.Closer. func (s *Streamer) Close() error { return s.outputRead.Close() }