in accumulator/batch.go [177:224]
func (b *Batch) AddAgentData(apmData APMData) error {
if len(apmData.Data) == 0 {
return ErrNoData
}
raw, err := GetUncompressedBytes(apmData.Data, apmData.ContentEncoding)
if err != nil {
return err
}
b.mu.Lock()
defer b.mu.Unlock()
if b.count >= b.maxSize {
return ErrBatchFull
}
if b.currentlyExecutingRequestID == "" {
return errors.New("lifecycle error, currently executing requestID is not set")
}
inc, ok := b.invocations[b.currentlyExecutingRequestID]
if !ok {
return fmt.Errorf("invocation for current requestID %s does not exist", b.currentlyExecutingRequestID)
}
// A request body can either be empty or have a ndjson content with
// first line being metadata.
data, after, _ := bytes.Cut(raw, newLineSep)
if b.metadataBytes == 0 {
b.metadataBytes, err = b.buf.Write(data)
if err != nil {
return fmt.Errorf("failed to write data to buffer: %v", err)
}
}
for {
data, after, _ = bytes.Cut(after, newLineSep)
if inc.NeedProxyTransaction() && findEventType(data) == transactionEvent {
res := gjson.GetBytes(data, "transaction.id")
if res.Str != "" && inc.TransactionID == res.Str {
inc.TransactionObserved = true
}
}
if err := b.addData(data); err != nil {
return err
}
if len(after) == 0 {
break
}
}
return nil
}