accumulator/batch.go (271 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package accumulator
import (
"bytes"
"errors"
"fmt"
"sync"
"time"
"github.com/tidwall/gjson"
)
var (
// ErrMetadataUnavailable is returned when a lambda data is added to
// the batch without metadata being set.
ErrMetadataUnavailable = errors.New("metadata is not yet available")
// ErrBatchFull signifies that the batch has reached full capacity
// and cannot accept more entries.
ErrBatchFull = errors.New("batch is full")
// ErrInvalidEncoding is returned for any APMData that is encoded
// with any encoding format
ErrInvalidEncoding = errors.New("encoded data not supported")
// ErrNoData indicates that APMData.data is empty
ErrNoData = errors.New("no data")
)
var (
maxSizeThreshold = 0.9
zeroTime = time.Time{}
newLineSep = []byte("\n")
transactionKey = "transaction"
metadataKey = "metadata"
)
type eventType int
const (
metadataEvent = iota
transactionEvent
otherEvent
)
// Batch manages the data that needs to be shipped to APM Server. It holds
// all the invocations that have not yet been shipped to the APM Server and
// is responsible for correlating the invocation with the APM data collected
// from all sources (logs API & APM Agents). As the batch gets the required
// data it marks the data ready for shipping to APM Server.
type Batch struct {
mu sync.RWMutex
// metadataBytes is the size of the metadata in bytes
metadataBytes int
// buf holds data that is ready to be shipped to APM-Server
buf bytes.Buffer
// invocations holds the data for a specific invocation with
// request ID as the key.
invocations map[string]*Invocation
count int
age time.Time
maxSize int
maxAge time.Duration
platformStartRequestID string
// currentlyExecutingRequestID represents the request ID of the currently
// executing lambda invocation. The ID can be set either on agent init or
// when extension receives the invoke event. If the agent hooks into the
// invoke lifecycle then it is possible to receive the agent init request
// before extension invoke is registered.
currentlyExecutingRequestID string
}
// NewBatch creates a new BatchData which can accept a
// maximum number of entries as specified by the arguments.
func NewBatch(maxSize int, maxAge time.Duration) *Batch {
return &Batch{
invocations: make(map[string]*Invocation),
maxSize: maxSize,
maxAge: maxAge,
}
}
// Size returns the number of invocations cached in the batch.
func (b *Batch) Size() int {
b.mu.RLock()
defer b.mu.RUnlock()
return len(b.invocations)
}
// RegisterInvocation registers a new function invocation against its request
// ID. It also updates the caches for currently executing request ID.
func (b *Batch) RegisterInvocation(
reqID, functionARN string,
deadlineMs int64,
timestamp time.Time,
) {
b.mu.Lock()
defer b.mu.Unlock()
i, ok := b.invocations[reqID]
if !ok {
i = &Invocation{}
b.invocations[reqID] = i
}
i.RequestID = reqID
i.FunctionARN = functionARN
i.DeadlineMs = deadlineMs
i.Timestamp = timestamp
b.currentlyExecutingRequestID = reqID
}
// OnAgentInit caches the transaction ID and the payload for the currently
// executing invocation as reported by the agent. The payload can contain
// metadata along with partial transaction. Metadata, if available, will
// be cached for all future invocation. The agent payload will be used to
// create a new transaction in an event the actual transaction is not
// reported by the agent due to unexpected termination.
func (b *Batch) OnAgentInit(reqID, contentEncoding string, raw []byte) error {
payload, err := GetUncompressedBytes(raw, contentEncoding)
if err != nil {
return fmt.Errorf("failed to decompress request body: %w", err)
}
var metadata, txnData []byte
switch findEventType(payload) {
case metadataEvent:
metadata, txnData, _ = bytes.Cut(payload, newLineSep)
case transactionEvent:
txnData = payload
default:
return errors.New("invalid payload")
}
txnID := gjson.GetBytes(txnData, "transaction.id").String()
if txnID == "" {
return errors.New("failed to parse transaction id from registration body")
}
b.mu.Lock()
defer b.mu.Unlock()
if b.metadataBytes == 0 && len(metadata) > 0 {
b.metadataBytes, err = b.buf.Write(metadata)
if err != nil {
return fmt.Errorf("failed to write metadata to buffer: %v", err)
}
}
i, ok := b.invocations[reqID]
if !ok {
// It is possible that the invocation is registered at a later time
i = &Invocation{}
b.invocations[reqID] = i
}
i.TransactionID, i.AgentPayload = txnID, txnData
b.currentlyExecutingRequestID = reqID
return nil
}
// AddAgentData adds a data received from agent. For a specific invocation
// agent data is always received in the same invocation. All the events
// extracted from the payload are added to the batch even though the batch
// might exceed the max size limit, however, if the batch is already full
// before adding any events then ErrBatchFull is returned.
func (b *Batch) AddAgentData(apmData APMData) error {
if len(apmData.Data) == 0 {
return ErrNoData
}
raw, err := GetUncompressedBytes(apmData.Data, apmData.ContentEncoding)
if err != nil {
return err
}
b.mu.Lock()
defer b.mu.Unlock()
if b.count >= b.maxSize {
return ErrBatchFull
}
if b.currentlyExecutingRequestID == "" {
return errors.New("lifecycle error, currently executing requestID is not set")
}
inc, ok := b.invocations[b.currentlyExecutingRequestID]
if !ok {
return fmt.Errorf("invocation for current requestID %s does not exist", b.currentlyExecutingRequestID)
}
// A request body can either be empty or have a ndjson content with
// first line being metadata.
data, after, _ := bytes.Cut(raw, newLineSep)
if b.metadataBytes == 0 {
b.metadataBytes, err = b.buf.Write(data)
if err != nil {
return fmt.Errorf("failed to write data to buffer: %v", err)
}
}
for {
data, after, _ = bytes.Cut(after, newLineSep)
if inc.NeedProxyTransaction() && findEventType(data) == transactionEvent {
res := gjson.GetBytes(data, "transaction.id")
if res.Str != "" && inc.TransactionID == res.Str {
inc.TransactionObserved = true
}
}
if err := b.addData(data); err != nil {
return err
}
if len(after) == 0 {
break
}
}
return nil
}
// OnLambdaLogRuntimeDone prepares the data for the invocation to be shipped
// to APM Server. It accepts requestID and status of the invocation both of
// which can be retrieved after parsing `platform.runtimeDone` event.
func (b *Batch) OnLambdaLogRuntimeDone(reqID, status string, endTime time.Time) error {
b.mu.Lock()
defer b.mu.Unlock()
return b.finalizeInvocation(reqID, status, endTime)
}
func (b *Batch) OnPlatformStart(reqID string) {
b.platformStartRequestID = reqID
}
func (b *Batch) PlatformStartReqID() string {
return b.platformStartRequestID
}
// OnPlatformReport should be the last event for a request ID. On receiving the
// platform.report event the batch will cleanup any datastructure for the request
// ID. It will return some of the function metadata to allow the caller to enrich
// the report metrics.
//
//nolint:gocritic
func (b *Batch) OnPlatformReport(reqID string) (string, int64, time.Time, error) {
b.mu.Lock()
defer b.mu.Unlock()
inc, ok := b.invocations[reqID]
if !ok {
return "", 0, time.Time{}, fmt.Errorf("invocation for requestID %s does not exist", reqID)
}
delete(b.invocations, reqID)
return inc.FunctionARN, inc.DeadlineMs, inc.Timestamp, nil
}
// OnShutdown flushes the data for shipping to APM Server by finalizing all
// the invocation in the batch. If we haven't received a platform.runtimeDone
// event for an invocation so far we won't be able to receive it in time thus
// the status needs to be guessed based on the available information.
func (b *Batch) OnShutdown(status string) error {
b.mu.Lock()
defer b.mu.Unlock()
for _, inc := range b.invocations {
// Assume that the transaction took all the function time.
// TODO: @lahsivjar Is it possible to tweak the extension lifecycle in
// a way that we receive the platform.report metric for a invocation
// consistently and enrich the metrics with reported values?
endTime := time.Unix(0, inc.DeadlineMs*int64(time.Millisecond))
if err := b.finalizeInvocation(inc.RequestID, status, endTime); err != nil {
return err
}
delete(b.invocations, inc.RequestID)
}
return nil
}
// AddLambdaData adds a new entry to the batch. Returns ErrBatchFull
// if batch has reached its maximum size.
func (b *Batch) AddLambdaData(d []byte) error {
b.mu.Lock()
defer b.mu.Unlock()
if b.count >= b.maxSize {
return ErrBatchFull
}
return b.addData(d)
}
// Count return the number of APMData entries in batch.
func (b *Batch) Count() int {
b.mu.RLock()
defer b.mu.RUnlock()
return b.count
}
// ShouldShip indicates when a batch is ready for sending.
// A batch is marked as ready for flush when one of the
// below conditions is reached:
// 1. max size is greater than threshold (90% of maxSize)
// 2. batch is older than maturity age
func (b *Batch) ShouldShip() bool {
b.mu.RLock()
defer b.mu.RUnlock()
return (b.count >= int(float64(b.maxSize)*maxSizeThreshold)) ||
(!b.age.IsZero() && time.Since(b.age) > b.maxAge)
}
// Reset resets the batch to prepare for new set of data
func (b *Batch) Reset() {
b.mu.Lock()
defer b.mu.Unlock()
b.count, b.age = 0, zeroTime
b.buf.Truncate(b.metadataBytes)
}
// ToAPMData returns APMData with metadata and the accumulated batch
func (b *Batch) ToAPMData() APMData {
b.mu.RLock()
defer b.mu.RUnlock()
return APMData{
Data: b.buf.Bytes(),
}
}
func (b *Batch) finalizeInvocation(reqID, status string, endTime time.Time) error {
inc, ok := b.invocations[reqID]
if !ok {
return fmt.Errorf("invocation for requestID %s does not exist", reqID)
}
proxyTxn, err := inc.MaybeCreateProxyTxn(status, endTime)
if err != nil {
return err
}
err = b.addData(proxyTxn)
if err != nil {
return err
}
inc.Finalized = true
return nil
}
func (b *Batch) addData(data []byte) error {
if len(data) == 0 {
return nil
}
if b.metadataBytes == 0 {
return ErrMetadataUnavailable
}
if err := b.buf.WriteByte('\n'); err != nil {
return err
}
if _, err := b.buf.Write(data); err != nil {
return err
}
if b.count == 0 {
// For first entry, set the age of the batch
b.age = time.Now()
}
b.count++
return nil
}
func findEventType(body []byte) eventType {
var quote byte
var key []byte
for i, r := range body {
if r == '"' || r == '\'' {
quote = r
key = body[i+1:]
break
}
}
end := bytes.IndexByte(key, quote)
if end == -1 {
return otherEvent
}
switch string(key[:end]) {
case transactionKey:
return transactionEvent
case metadataKey:
return metadataEvent
}
return otherEvent
}