input/elasticapm/internal/decoder/stream_decoder.go (59 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package decoder import ( "bufio" "bytes" "io" ) // NewNDJSONStreamDecoder returns a new NDJSONStreamDecoder which decodes // ND-JSON lines from r, with a maximum line length of maxLineLength. func NewNDJSONStreamDecoder(r io.Reader, maxLineLength int) *NDJSONStreamDecoder { var dec NDJSONStreamDecoder dec.bufioReader = bufio.NewReaderSize(r, maxLineLength) dec.lineReader = NewLineReader(dec.bufioReader, maxLineLength) return &dec } // NDJSONStreamDecoder decodes a stream of ND-JSON lines from an io.Reader. type NDJSONStreamDecoder struct { latestError error bufioReader *bufio.Reader lineReader *LineReader latestLine []byte latestLineReader bytes.Reader isEOF bool } // Reset sets sr's underlying io.Reader to r, and resets any reading/decoding state. func (dec *NDJSONStreamDecoder) Reset(r io.Reader) { dec.bufioReader.Reset(r) dec.lineReader.Reset(dec.bufioReader) dec.isEOF = false dec.latestLine = nil dec.resetLatestLineReader() } // Decode decodes the next line into v. func (dec *NDJSONStreamDecoder) Decode(v interface{}) error { defer dec.resetLatestLineReader() if dec.latestLineReader.Size() == 0 { _, _ = dec.ReadAhead() // error checked below } if len(dec.latestLine) == 0 || (dec.latestError != nil && !dec.isEOF) { return dec.latestError } iter := json.BorrowIterator(dec.latestLine) defer json.ReturnIterator(iter) iter.ReadVal(v) if iter.Error != nil && iter.Error != io.EOF { return JSONDecodeError("data read error: " + iter.Error.Error()) } return dec.latestError // this might be io.EOF } // ReadAhead reads the next NDJSON line, buffering it for a subsequent call to Decode. func (dec *NDJSONStreamDecoder) ReadAhead() ([]byte, error) { // readLine can return valid data in `buf` _and_ also an io.EOF line, readErr := dec.lineReader.ReadLine() dec.latestLine = line dec.latestLineReader.Reset(dec.latestLine) dec.latestError = readErr dec.isEOF = readErr == io.EOF return line, readErr } func (dec *NDJSONStreamDecoder) resetLatestLineReader() { dec.latestLineReader.Reset(nil) dec.latestError = nil } // IsEOF signals whether the underlying reader reached the end func (dec *NDJSONStreamDecoder) IsEOF() bool { return dec.isEOF } // LatestLine returns the latest line read as []byte func (dec *NDJSONStreamDecoder) LatestLine() []byte { return dec.latestLine } // JSONDecodeError is a custom error that can occur during JSON decoding type JSONDecodeError string func (s JSONDecodeError) Error() string { return string(s) }