func()

in accumulator/batch.go [177:224]


func (b *Batch) AddAgentData(apmData APMData) error {
	if len(apmData.Data) == 0 {
		return ErrNoData
	}
	raw, err := GetUncompressedBytes(apmData.Data, apmData.ContentEncoding)
	if err != nil {
		return err
	}

	b.mu.Lock()
	defer b.mu.Unlock()
	if b.count >= b.maxSize {
		return ErrBatchFull
	}
	if b.currentlyExecutingRequestID == "" {
		return errors.New("lifecycle error, currently executing requestID is not set")
	}
	inc, ok := b.invocations[b.currentlyExecutingRequestID]
	if !ok {
		return fmt.Errorf("invocation for current requestID %s does not exist", b.currentlyExecutingRequestID)
	}

	// A request body can either be empty or have a ndjson content with
	// first line being metadata.
	data, after, _ := bytes.Cut(raw, newLineSep)
	if b.metadataBytes == 0 {
		b.metadataBytes, err = b.buf.Write(data)
		if err != nil {
			return fmt.Errorf("failed to write data to buffer: %v", err)
		}
	}
	for {
		data, after, _ = bytes.Cut(after, newLineSep)
		if inc.NeedProxyTransaction() && findEventType(data) == transactionEvent {
			res := gjson.GetBytes(data, "transaction.id")
			if res.Str != "" && inc.TransactionID == res.Str {
				inc.TransactionObserved = true
			}
		}
		if err := b.addData(data); err != nil {
			return err
		}
		if len(after) == 0 {
			break
		}
	}
	return nil
}