helpers/trace/buffer.go (173 lines of code) (raw):
package trace
import (
"bufio"
"errors"
"fmt"
"hash"
"hash/crc32"
"io"
"os"
"sync"
"unicode/utf8"
"gitlab.com/gitlab-org/gitlab-runner/helpers"
"golang.org/x/text/encoding"
"golang.org/x/text/transform"
)
const defaultBytesLimit = 4 * 1024 * 1024 // 4MB
var errLogLimitExceeded = errors.New("log limit exceeded")
type Buffer struct {
lock sync.RWMutex
lw *limitWriter
w io.WriteCloser
logFile *os.File
bufw *bufio.Writer
checksum hash.Hash32
opts options
// failedFlush indicates that a read which subsequentialy attempted to
// flush data to the underlying writer failed. In this scenario, calls to
// Write() will immediately attempt to flush and return any error on a
// failure.
failedFlush bool
}
type options struct {
}
type Option func(*options) error
func (b *Buffer) SetLimit(size int) {
b.lock.Lock()
defer b.lock.Unlock()
b.lw.limit = int64(size)
}
func (b *Buffer) Size() int {
b.lock.RLock()
defer b.lock.RUnlock()
if b.lw == nil {
return 0
}
return int(b.lw.written)
}
func (b *Buffer) Bytes(offset, n int) ([]byte, error) {
b.lock.Lock()
defer b.lock.Unlock()
// For simplicity, we read only from the file, rather than also the bufio.Writer.
// To ensure the underlying file has the data requested, we always flush the
// buffer.
//
// If a failure occurs on flushing the data, we store that an error occurred so
// buffer.Write() can retry and additionally return any error on the write side.
if err := b.bufw.Flush(); err != nil {
b.failedFlush = true
return nil, fmt.Errorf("flushing log buffer: %w", err)
}
size := int(b.lw.written - int64(offset))
if n > size {
n = size
}
buf := make([]byte, n)
_, err := b.logFile.ReadAt(buf, int64(offset))
if err == io.EOF {
err = nil
}
return buf, err
}
func (b *Buffer) Write(p []byte) (int, error) {
b.lock.Lock()
defer b.lock.Unlock()
n, err := b.w.Write(p)
// if we get a log limit exceeded error, we've written the log limit
// notice out to the log and will now silently not write any additional
// data: we return len(p), nil so the caller continues as normal.
if err == errLogLimitExceeded {
return len(p), nil
}
// if we previously failed to flush to the underlying writer, try again
// and return any failure immediately.
if b.failedFlush {
if err := b.bufw.Flush(); err != nil {
return n, err
}
b.failedFlush = false
}
return n, err
}
func (b *Buffer) Finish() {
b.lock.RLock()
defer b.lock.RUnlock()
if b.w != nil {
_ = b.w.Close()
}
}
func (b *Buffer) Close() {
_ = b.logFile.Close()
_ = os.Remove(b.logFile.Name())
}
func (b *Buffer) Checksum() string {
b.lock.RLock()
defer b.lock.RUnlock()
return fmt.Sprintf("crc32:%08x", b.checksum.Sum32())
}
type limitWriter struct {
w io.Writer
written int64
limit int64
}
func (w *limitWriter) Write(p []byte) (int, error) {
capacity := w.limit - w.written
if capacity <= 0 {
return 0, errLogLimitExceeded
}
if int64(len(p)) >= capacity {
p = truncateSafeUTF8(p, capacity)
n, err := w.w.Write(p)
if err == nil {
err = errLogLimitExceeded
}
if n < 0 {
n = 0
}
w.written += int64(n)
w.writeLimitExceededMessage()
return n, err
}
n, err := w.w.Write(p)
if n < 0 {
n = 0
}
w.written += int64(n)
return n, err
}
func (w *limitWriter) writeLimitExceededMessage() {
n, _ := fmt.Fprintf(
w.w,
"\n%sJob's log exceeded limit of %v bytes.\n"+
"Job execution will continue but no more output will be collected.%s\n",
helpers.ANSI_BOLD_YELLOW,
w.limit,
helpers.ANSI_RESET,
)
w.written += int64(n)
}
func New(opts ...Option) (*Buffer, error) {
logFile, err := newLogFile()
if err != nil {
return nil, err
}
options := options{}
for _, o := range opts {
err := o(&options)
if err != nil {
return nil, err
}
}
buffer := &Buffer{
logFile: logFile,
bufw: bufio.NewWriter(logFile),
checksum: crc32.NewIEEE(),
opts: options,
}
buffer.lw = &limitWriter{
w: io.MultiWriter(buffer.bufw, buffer.checksum),
written: 0,
limit: defaultBytesLimit,
}
buffer.w = transform.NewWriter(buffer.lw, encoding.Replacement.NewEncoder())
return buffer, nil
}
func newLogFile() (*os.File, error) {
return os.CreateTemp("", "trace")
}
// truncateSafeUTF8 truncates a job log at the capacity but avoids
// breaking up a multi-byte UTF-8 character.
func truncateSafeUTF8(p []byte, capacity int64) []byte {
for i := 0; i < 4; i++ {
r, s := utf8.DecodeLastRune(p[:capacity])
if r == utf8.RuneError && s == 1 {
capacity--
continue
}
break
}
return p[:capacity]
}