input/elasticapm/internal/decoder/line_reader.go (45 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package decoder
import (
"bufio"
"io"
"github.com/pkg/errors"
)
var ErrLineTooLong = errors.New("Line exceeded permitted length")
// LineReader reads length-limited lines from streams using a limited amount of memory.
type LineReader struct {
br *bufio.Reader
maxLineLength int
skip bool
}
func NewLineReader(reader *bufio.Reader, maxLineLength int) *LineReader {
return &LineReader{
br: reader,
maxLineLength: maxLineLength,
}
}
// Reset sets lr's underlying *bufio.Reader to br, and clears any state.
func (lr *LineReader) Reset(br *bufio.Reader) {
lr.br = br
lr.skip = false
}
// ReadLine reads the next line from the given reader.
// If it encounters a line that is longer than `maxLineLength` it will
// return the first `maxLineLength` bytes with `ErrLineTooLong`. On the next
// call it will return the next line.
func (lr *LineReader) ReadLine() ([]byte, error) {
for {
prefix := false
line, err := lr.br.ReadSlice('\n')
if err == bufio.ErrBufferFull {
prefix = true
}
if !lr.skip {
if prefix {
lr.skip = true
return line[:lr.maxLineLength], ErrLineTooLong
}
if len(line) > 0 && line[len(line)-1] == '\n' {
line = line[:len(line)-1]
}
return line, err
} else if err == io.EOF {
return nil, io.EOF
} else if !prefix {
lr.skip = false
}
}
}