pkg/output/device.go (46 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package output
import (
"bytes"
"io"
"sync"
)
// Device masks an io.Writer with an intermediary buffer that receives
// the writes while the device is paused. Once the device is unpaused
// the masked io.Writer will receive the buffered writes.
type Device struct {
writer io.Writer
buf *bytes.Buffer
paused bool
mu sync.Mutex
}
// NewDevice instantiates a new Device from an io.Writer
func NewDevice(device io.Writer) *Device {
return &Device{
writer: device,
buf: new(bytes.Buffer),
}
}
// Write writes len(p) bytes from p to the underlying data stream.
// It returns the number of bytes written from p (0 <= n <= len(p))
// and any error encountered that caused the write to stop early.
// If the device is paused, instead of writing to the actual device
// it writes to an intermediary buffer that will hold any writes
// while the device is paused. If the device is paused for too long
// and causes the
func (d *Device) Write(p []byte) (n int, err error) {
d.mu.Lock()
defer d.mu.Unlock()
if d.paused {
defer func() {
// Recover from panic if the buffer becomes too large
// nolint
recover()
}()
return d.buf.Write(p)
}
return d.writer.Write(p)
}
// Pause pauses writes to the masked device and writes to an
// intermediary buffer.
func (d *Device) Pause() {
defer d.mu.Unlock()
d.mu.Lock()
d.paused = true
}
// Resume copies the contents of the intermediary buffer to the device
// that is being masked.
func (d *Device) Resume() (n int64, err error) {
defer func() {
d.buf.Reset()
d.mu.Unlock()
}()
d.mu.Lock()
d.paused = false
return io.Copy(d.writer, d.buf)
}
// Writer returns the io.Writer that Device is wrapping.
func (d *Device) Writer() io.Writer {
return d.writer
}