apmproxy/apmserver.go (282 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package apmproxy
import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"math"
"math/rand"
"net/http"
"time"
"github.com/elastic/apm-aws-lambda/accumulator"
"github.com/elastic/apm-aws-lambda/version"
"go.uber.org/zap"
)
type jsonResult struct {
Errors []jsonError `json:"errors,omitempty"`
}
type jsonError struct {
Message string `json:"message"`
Document string `json:"document,omitempty"`
}
// ForwardApmData receives apm data as it comes in and posts it to the APM server.
// Stop checking for, and sending apm data when the function invocation
// has completed, signaled via a channel.
func (c *Client) ForwardApmData(ctx context.Context) error {
if c.IsUnhealthy() {
c.logger.Warn("Failed to start APM data forwarder due to client unhealthy")
return nil
}
var lambdaDataChan chan []byte
for {
select {
case <-ctx.Done():
c.logger.Debug("Invocation context canceled, not processing any more agent data")
return nil
case data := <-c.AgentDataChannel:
if len(data.Data) == 0 {
c.logger.Debugf("Received something from '%s' without APMData", data.AgentInfo)
continue
}
if err := c.forwardAgentData(ctx, data); err != nil {
return err
}
if lambdaDataChan == nil {
// With the first successful request to c.forwardAgent Data() metadata should be
// available and processing data from c.LambdaDataChannel can start.
lambdaDataChan = c.LambdaDataChannel
c.logger.Debug("Assigned Lambda data channel")
}
case data := <-lambdaDataChan:
if err := c.forwardLambdaData(ctx, data); err != nil {
return err
}
}
}
}
// FlushAPMData reads all the apm data in the apm data channel and sends it to the APM server.
func (c *Client) FlushAPMData(ctx context.Context) {
if c.IsUnhealthy() {
c.logger.Debug("Flush skipped - Transport failing")
return
}
c.logger.Debug("Flush started - Checking for agent data")
// Flush agent data first to make sure metadata is available if possible
for i := len(c.AgentDataChannel); i > 0; i-- {
data := <-c.AgentDataChannel
if err := c.forwardAgentData(ctx, data); err != nil {
c.logger.Errorf("Error sending to APM Server, skipping: %v", err)
}
}
// If metadata still not available then fail fast
if c.batch == nil {
c.logger.Warnf("Metadata not available at flush, skipping sending lambda data to APM Server")
return
}
// Flush lambda data
c.logger.Debug("Flush in progress - Processing lambda data")
for {
select {
case apmData := <-c.LambdaDataChannel:
if err := c.forwardLambdaData(ctx, apmData); err != nil {
c.logger.Errorf("Error sending to APM server, skipping: %v", err)
}
case <-ctx.Done():
c.logger.Debug("Failed to flush completely, may result in data drop")
return
default:
// Flush any remaining data in batch
if err := c.sendBatch(ctx); err != nil {
c.logger.Errorf("Error sending to APM server, skipping: %v", err)
}
c.logger.Debug("Flush ended for lambda data - no data in buffer")
return
}
}
}
// PostToApmServer takes a chunk of APM agent data and posts it to the APM server.
//
// The function compresses the APM agent data, if it's not already compressed.
// It sets the APM transport status to failing upon errors, as part of the backoff
// strategy.
func (c *Client) PostToApmServer(ctx context.Context, apmData accumulator.APMData) error {
// todo: can this be a streaming or streaming style call that keeps the
// connection open across invocations?
if c.IsUnhealthy() {
return errors.New("transport status is unhealthy")
}
endpointURI := "intake/v2/events"
encoding := apmData.ContentEncoding
agentInfo := apmData.AgentInfo
var r io.Reader
if apmData.ContentEncoding != "" {
r = bytes.NewReader(apmData.Data)
} else {
encoding = "gzip"
buf := c.bufferPool.Get().(*bytes.Buffer)
defer func() {
buf.Reset()
c.bufferPool.Put(buf)
}()
gw, err := gzip.NewWriterLevel(buf, gzip.BestSpeed)
if err != nil {
return err
}
if _, err := gw.Write(apmData.Data); err != nil {
return fmt.Errorf("failed to compress data: %w", err)
}
if err := gw.Close(); err != nil {
return fmt.Errorf("failed to write compressed data to buffer: %w", err)
}
r = buf
}
req, err := http.NewRequest(http.MethodPost, c.serverURL+endpointURI, r)
if err != nil {
return fmt.Errorf("failed to create a new request when posting to APM server: %v", err)
}
req.Header.Add("Content-Encoding", encoding)
req.Header.Add("Content-Type", "application/x-ndjson")
req.Header.Set("User-Agent", version.UserAgent+" "+agentInfo)
if c.ServerAPIKey != "" {
req.Header.Add("Authorization", "ApiKey "+c.ServerAPIKey)
} else if c.ServerSecretToken != "" {
req.Header.Add("Authorization", "Bearer "+c.ServerSecretToken)
}
c.logger.Debug("Sending data chunk to APM server")
resp, err := c.client.Do(req)
if err != nil {
c.UpdateStatus(ctx, Failing)
return fmt.Errorf("failed to post to APM server: %v", err)
}
defer resp.Body.Close()
// On success, the server will respond with a 202 Accepted status code and no body.
if resp.StatusCode == http.StatusAccepted {
c.UpdateStatus(ctx, Healthy)
return nil
}
// RateLimited
if resp.StatusCode == http.StatusTooManyRequests {
c.logger.Warnf("Transport has been rate limited: response status code: %d", resp.StatusCode)
c.UpdateStatus(ctx, RateLimited)
return nil
}
// Auth errors
if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden {
logBodyErrors(c.logger, resp)
c.UpdateStatus(ctx, Failing)
return nil
}
// ClientErrors
if resp.StatusCode >= 400 && resp.StatusCode < 500 {
logBodyErrors(c.logger, resp)
c.UpdateStatus(ctx, ClientFailing)
return nil
}
// critical errors
if resp.StatusCode == http.StatusInternalServerError || resp.StatusCode == http.StatusServiceUnavailable {
logBodyErrors(c.logger, resp)
c.UpdateStatus(ctx, Failing)
return nil
}
c.logger.Warnf("unhandled status code: %d", resp.StatusCode)
return nil
}
func logBodyErrors(logger *zap.SugaredLogger, resp *http.Response) {
b, err := io.ReadAll(resp.Body)
if err != nil {
logger.Warnf("failed to post data to APM server: response status: %s: failed to read response body: %v", resp.Status, err)
return
}
jErr := jsonResult{}
if err := json.Unmarshal(b, &jErr); err != nil {
logger.Warnf("failed to post data to APM server: response status: %s: failed to decode response body: %v: body: %s", resp.Status, err, string(b))
return
}
if len(jErr.Errors) == 0 {
logger.Warnf("failed to post data to APM server: response status: %s: response body: %s", resp.Status, string(b))
return
}
logger.Warnf("failed to post data to APM server: response status: %s", resp.Status)
for _, err := range jErr.Errors {
logger.Warnf("document %s: message: %s", err.Document, err.Message)
}
}
// IsUnhealthy returns true if the apmproxy is not healthy.
func (c *Client) IsUnhealthy() bool {
c.mu.RLock()
defer c.mu.RUnlock()
return c.Status == Failing
}
// UpdateStatus takes a state of the APM server transport and updates
// the current state of the transport. For a change to a failing state, the grace period
// is calculated and a go routine is started that waits for that period to complete
// before changing the status to "pending". This would allow a subsequent send attempt
// to the APM server.
//
// This function is public for use in tests.
func (c *Client) UpdateStatus(ctx context.Context, status Status) {
// Reduce lock contention as UpdateStatus is called on every
// successful request
c.mu.RLock()
if status == c.Status {
c.mu.RUnlock()
return
}
c.mu.RUnlock()
switch status {
case Healthy:
c.mu.Lock()
if c.Status == status {
return
}
c.Status = status
c.logger.Debugf("APM server Transport status set to %s", c.Status)
c.ReconnectionCount = -1
c.mu.Unlock()
case RateLimited, ClientFailing:
// No need to start backoff, this is a temporary status. It usually
// means we went over the limit of events/s.
c.mu.Lock()
c.Status = status
c.logger.Debugf("APM server Transport status set to %s", c.Status)
c.mu.Unlock()
case Failing:
c.mu.Lock()
c.Status = status
c.logger.Debugf("APM server Transport status set to %s", c.Status)
c.ReconnectionCount++
gracePeriodTimer := time.NewTimer(c.ComputeGracePeriod())
c.logger.Debugf("Grace period entered, reconnection count : %d", c.ReconnectionCount)
c.mu.Unlock()
go func() {
select {
case <-gracePeriodTimer.C:
c.logger.Debug("Grace period over - timer timed out")
case <-ctx.Done():
c.logger.Debug("Grace period over - context done")
}
c.mu.Lock()
c.Status = Started
c.logger.Debugf("APM server Transport status set to %s", c.Status)
c.mu.Unlock()
}()
default:
c.logger.Errorf("Cannot set APM server Transport status to %s", status)
}
}
// ComputeGracePeriod https://github.com/elastic/apm/blob/main/specs/agents/transport.md#transport-errors
func (c *Client) ComputeGracePeriod() time.Duration {
// If reconnectionCount is 0, returns a random number in an interval.
// The grace period for the first reconnection count was 0 but that
// leads to collisions with multiple environments.
if c.ReconnectionCount == 0 {
gracePeriod := rand.Float64() * 5 //nolint:gosec
return time.Duration(gracePeriod * float64(time.Second))
}
gracePeriodWithoutJitter := math.Pow(math.Min(float64(c.ReconnectionCount), 6), 2)
jitter := rand.Float64()/5 - 0.1 //nolint:gosec
return time.Duration((gracePeriodWithoutJitter + jitter*gracePeriodWithoutJitter) * float64(time.Second))
}
// ShouldFlush returns true if the client should flush APM data after processing the event.
func (c *Client) ShouldFlush() bool {
return c.sendStrategy == SyncFlush
}
// ResetFlush resets the client's "agent flushed" state, such that
// subsequent calls to WaitForFlush will block until another request
// is received from the agent indicating it has flushed.
func (c *Client) ResetFlush() {
c.flushMutex.Lock()
defer c.flushMutex.Unlock()
c.flushCh = make(chan struct{})
}
// WaitForFlush returns a channel that is closed when the agent has signaled that
// the Lambda invocation has completed, and there is no more APM data coming.
func (c *Client) WaitForFlush() <-chan struct{} {
c.flushMutex.Lock()
defer c.flushMutex.Unlock()
return c.flushCh
}
func (c *Client) forwardAgentData(ctx context.Context, apmData accumulator.APMData) error {
if err := c.batch.AddAgentData(apmData); err != nil {
c.logger.Warnf("Dropping agent data due to error: %v", err)
}
if c.batch.ShouldShip() {
return c.sendBatch(ctx)
}
return nil
}
func (c *Client) forwardLambdaData(ctx context.Context, data []byte) error {
if err := c.batch.AddLambdaData(data); err != nil {
c.logger.Warnf("Dropping lambda data due to error: %v", err)
}
if c.batch.ShouldShip() {
return c.sendBatch(ctx)
}
return nil
}
func (c *Client) sendBatch(ctx context.Context) error {
if c.batch == nil || c.batch.Count() == 0 {
return nil
}
defer c.batch.Reset()
return c.PostToApmServer(ctx, c.batch.ToAPMData())
}