input/elasticapm/processor.go (269 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package elasticapm import ( "bytes" "context" "errors" "fmt" "io" "go.elastic.co/apm/v2" "go.uber.org/zap" "github.com/elastic/apm-data/input" "github.com/elastic/apm-data/input/elasticapm/internal/decoder" "github.com/elastic/apm-data/input/elasticapm/internal/modeldecoder" "github.com/elastic/apm-data/input/elasticapm/internal/modeldecoder/rumv3" v2 "github.com/elastic/apm-data/input/elasticapm/internal/modeldecoder/v2" "github.com/elastic/apm-data/model/modelpb" ) var ( errUnrecognizedObject = errors.New("did not recognize object type") errEmptyBody = errors.New("empty body") ) const ( errorEventType = "error" metricsetEventType = "metricset" spanEventType = "span" transactionEventType = "transaction" logEventType = "log" rumv3ErrorEventType = "e" rumv3TransactionEventType = "x" v2MetadataKey = "metadata" rumv3MetadataKey = "m" ) // Processor decodes a streams and is safe for concurrent use. The processor // accepts a channel that is used as a semaphore to control the maximum // concurrent number of stream decode operations that can happen at any time. // The buffered channel is meant to be shared between all the processors so // the concurrency limit is shared between all the intake endpoints. type Processor struct { sem input.Semaphore logger *zap.Logger MaxEventSize int } // Config holds configuration for Processor constructors. type Config struct { // Semaphore holds a semaphore on which Processor.HandleStream will acquire a // token before proceeding, to limit concurrency. Semaphore input.Semaphore // Logger holds a logger for the processor. If Logger is nil, // then no logging will be performed. Logger *zap.Logger // MaxEventSize holds the maximum event size, in bytes. MaxEventSize int } // StreamHandler is an interface for handling an Elastic APM agent ND-JSON event // stream, implemented by processor/stream. type StreamHandler interface { HandleStream( ctx context.Context, baseEvent *modelpb.APMEvent, stream io.Reader, batchSize int, processor modelpb.BatchProcessor, out *Result, ) error } // NewProcessor returns a new Processor for processing an event stream from // Elastic APM agents. func NewProcessor(cfg Config) *Processor { if cfg.Logger == nil { cfg.Logger = zap.NewNop() } return &Processor{ MaxEventSize: cfg.MaxEventSize, sem: cfg.Semaphore, logger: cfg.Logger, } } func (p *Processor) readMetadata(reader *streamReader, out *modelpb.APMEvent) error { body, err := reader.ReadAhead() if err != nil { if err == io.EOF { if len(reader.LatestLine()) == 0 { return errEmptyBody } return &InvalidInputError{ Message: "EOF while reading metadata", Document: string(reader.LatestLine()), } } return reader.wrapError(err) } switch key := p.identifyEventType(body); string(key) { case v2MetadataKey: if err := v2.DecodeNestedMetadata(reader, out); err != nil { return reader.wrapError(err) } case rumv3MetadataKey: if err := rumv3.DecodeNestedMetadata(reader, out); err != nil { return reader.wrapError(err) } default: return &InvalidInputError{ Message: fmt.Sprintf("%q or %q required", v2MetadataKey, rumv3MetadataKey), Document: string(reader.LatestLine()), } } return nil } // identifyEventType takes a reader and reads ahead the first key of the // underlying json input. This method makes some assumptions met by the // input format: // - the input is in JSON format // - every valid ndjson line only has one root key // - the bytes that we must match on are ASCII func (p *Processor) identifyEventType(body []byte) []byte { // find event type, trim spaces and account for single and double quotes var quote byte var key []byte for i, r := range body { if r == '"' || r == '\'' { quote = r key = body[i+1:] break } } end := bytes.IndexByte(key, quote) if end == -1 { return nil } return key[:end] } // readBatch reads up to `batchSize` events from the ndjson stream into // batch, returning the number of events read and any error encountered. // Callers should always process the n > 0 events returned before considering // the error err. func (p *Processor) readBatch( ctx context.Context, baseEvent *modelpb.APMEvent, batchSize int, batch *modelpb.Batch, reader *streamReader, result *Result, ) (int, error) { // input events are decoded and appended to the batch origLen := len(*batch) for i := 0; i < batchSize && !reader.IsEOF(); i++ { body, err := reader.ReadAhead() if err != nil && err != io.EOF { err := reader.wrapError(err) var invalidInput *InvalidInputError if errors.As(err, &invalidInput) { result.addError(err) continue } // return early, we assume we can only recover from a input error types return len(*batch) - origLen, err } if len(body) == 0 { // required for backwards compatibility - sending empty lines was permitted in previous versions continue } // We copy the event for each iteration of the batch, as to avoid // shallow copies of Labels and NumericLabels. input := modeldecoder.Input{Base: baseEvent} switch eventType := p.identifyEventType(body); string(eventType) { case errorEventType: err = v2.DecodeNestedError(reader, &input, batch) case metricsetEventType: err = v2.DecodeNestedMetricset(reader, &input, batch) case spanEventType: err = v2.DecodeNestedSpan(reader, &input, batch) case transactionEventType: err = v2.DecodeNestedTransaction(reader, &input, batch) case logEventType: err = v2.DecodeNestedLog(reader, &input, batch) case rumv3ErrorEventType: err = rumv3.DecodeNestedError(reader, &input, batch) case rumv3TransactionEventType: err = rumv3.DecodeNestedTransaction(reader, &input, batch) default: err = fmt.Errorf("%w: %q", errUnrecognizedObject, eventType) } if err != nil && err != io.EOF { result.addError(&InvalidInputError{ Message: err.Error(), Document: string(reader.LatestLine()), }) } } if reader.IsEOF() { return len(*batch) - origLen, io.EOF } return len(*batch) - origLen, nil } // HandleStream processes a stream of events in batches of batchSize at a time, // updating result as events are accepted, or per-event errors occur. // // HandleStream will return an error when a terminal stream-level error occurs, // such as the rate limit being exceeded, or due to authorization errors. In // this case the result will only cover the subset of events accepted. // // Callers must not access result concurrently with HandleStream. func (p *Processor) HandleStream( ctx context.Context, baseEvent *modelpb.APMEvent, reader io.Reader, batchSize int, processor modelpb.BatchProcessor, result *Result, ) error { sp, ctx := apm.StartSpan(ctx, "Stream", "Reporter") defer sp.End() // Limit the number of concurrent batch decodes. // // The semaphore defaults to 200 (N), only allowing N requests to read // an cache Y events (determined by batchSize) from the batch. if err := p.semAcquire(ctx); err != nil { return fmt.Errorf("unable to service request: %w", err) } sr := p.getStreamReader(reader) // Release the semaphore on early exit defer p.sem.Release(1) // The first item is the metadata object. if err := p.readMetadata(sr, baseEvent); err != nil { if err == errEmptyBody { return nil } // no point in continuing if we couldn't read the metadata if _, ok := err.(*InvalidInputError); ok { return fmt.Errorf("cannot read metadata in stream: %w", err) } return &InvalidInputError{ Message: err.Error(), Document: string(sr.LatestLine()), } } batch := make(modelpb.Batch, 0, batchSize) for { // reuse the batch for future iterations without pooling each time batch = batch[:0] err := p.handleStream(ctx, &batch, baseEvent, batchSize, sr, processor, result) if err != nil { if errors.Is(err, io.EOF) { return nil } return fmt.Errorf("cannot handle stream: %w", err) } } } func (p *Processor) handleStream( ctx context.Context, batch *modelpb.Batch, baseEvent *modelpb.APMEvent, batchSize int, sr *streamReader, processor modelpb.BatchProcessor, result *Result, ) error { n, readErr := p.readBatch(ctx, baseEvent, batchSize, batch, sr, result) if n == 0 { return readErr } if err := processor.ProcessBatch(ctx, batch); err != nil { return fmt.Errorf("cannot process batch: %w", err) } for _, v := range *batch { switch v.Type() { case modelpb.ErrorEventType: result.AcceptedDetails.Error++ case modelpb.SpanEventType: result.AcceptedDetails.Span++ case modelpb.TransactionEventType: result.AcceptedDetails.Transaction++ case modelpb.MetricEventType: result.AcceptedDetails.Metric++ case modelpb.LogEventType: result.AcceptedDetails.Log++ } } result.Accepted += n return readErr } // getStreamReader returns a streamReader that reads ND-JSON lines from r. func (p *Processor) getStreamReader(r io.Reader) *streamReader { return &streamReader{ processor: p, NDJSONStreamDecoder: decoder.NewNDJSONStreamDecoder(r, p.MaxEventSize), } } func (p *Processor) semAcquire(ctx context.Context) error { sp, ctx := apm.StartSpan(ctx, "Semaphore.Acquire", "Reporter") defer sp.End() return p.sem.Acquire(ctx, 1) } // streamReader wraps NDJSONStreamReader, converting errors to stream errors. type streamReader struct { processor *Processor *decoder.NDJSONStreamDecoder } func (sr *streamReader) wrapError(err error) error { if _, ok := err.(decoder.JSONDecodeError); ok { return &InvalidInputError{ Message: err.Error(), Document: string(sr.LatestLine()), } } var e = err if err, ok := err.(modeldecoder.DecoderError); ok { e = err.Unwrap() } if errors.Is(e, decoder.ErrLineTooLong) { return &InvalidInputError{ TooLarge: true, Message: "event exceeded the permitted size", Document: string(sr.LatestLine()), } } return err }