loadgen/eventhandler/apm.go (94 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
package eventhandler
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"go.uber.org/zap"
)
func NewAPM(logger *zap.Logger, config Config) (*Handler, error) {
config.Writer = writeAPMEvents
return New(logger, config, &APMEventCollector{})
}
// APMTransport sends the contents of a reader to a remote APMTransport Server.
type APMTransport struct {
logger *zap.Logger
client *http.Client
intakeHeaders http.Header
intakeV2URL string
}
// NewAPMTransport initializes a new ReplayTransport.
func NewAPMTransport(logger *zap.Logger, c *http.Client, srvURL, token, apiKey string, headers map[string]string) Transport {
intakeHeaders := make(http.Header)
intakeHeaders.Set("Content-Encoding", "deflate")
intakeHeaders.Set("Content-Type", "application/x-ndjson")
intakeHeaders.Set("Transfer-Encoding", "chunked")
intakeHeaders.Set("Authorization", getAuthHeader(token, apiKey))
for name, header := range headers {
intakeHeaders.Set(name, header)
}
return &APMTransport{
logger: logger,
client: c,
intakeV2URL: srvURL + `/intake/v2/events`,
intakeHeaders: intakeHeaders,
}
}
// SendEvents sends the reader contents to `/intake/v2/events` as a batch.
func (t *APMTransport) SendEvents(ctx context.Context, r io.Reader, ignoreErrs bool) error {
req, err := http.NewRequestWithContext(ctx, http.MethodPost, t.intakeV2URL, r)
if err != nil {
return err
}
// Since the ContentLength will be automatically set on `bytes.Reader`,
// set it to `-1` just like the agents would.
req.ContentLength = -1
req.Header = t.intakeHeaders
req.Host = req.Header.Get("Host")
return t.sendEvents(req, ignoreErrs)
}
func (t *APMTransport) sendEvents(req *http.Request, ignoreErrs bool) error {
res, err := t.client.Do(req)
if err != nil {
return err
}
defer res.Body.Close()
defer logResponseOutcome(t.logger, res)
if !ignoreErrs {
switch res.StatusCode / 100 {
case 4:
return fmt.Errorf("unexpected client error: %d", res.StatusCode)
case 5:
return fmt.Errorf("unexpected server error: %d", res.StatusCode)
}
}
return nil
}
func logResponseOutcome(logger *zap.Logger, res *http.Response) {
if res == nil {
return
}
var body bytes.Buffer
if _, err := body.ReadFrom(res.Body); err != nil {
logger.Error("cannot read body", zap.Error(err))
}
destination := "unknown"
if res.Request != nil {
destination = res.Request.URL.String()
}
if res.StatusCode >= http.StatusBadRequest {
logger.Error("request failed",
zap.Int("status_code", res.StatusCode), zap.String("response", body.String()),
zap.String("destination", destination))
} else {
logger.Debug("request completed",
zap.Int("status_code", res.StatusCode), zap.String("response", body.String()),
zap.String("destination", destination))
}
}
func getAuthHeader(token string, apiKey string) string {
var auth string
if token != "" {
auth = "Bearer " + token
}
if apiKey != "" {
auth = "ApiKey " + apiKey
}
return auth
}