in apmproxy/apmserver.go [132:223]
func (c *Client) PostToApmServer(ctx context.Context, apmData accumulator.APMData) error {
// todo: can this be a streaming or streaming style call that keeps the
// connection open across invocations?
if c.IsUnhealthy() {
return errors.New("transport status is unhealthy")
}
endpointURI := "intake/v2/events"
encoding := apmData.ContentEncoding
agentInfo := apmData.AgentInfo
var r io.Reader
if apmData.ContentEncoding != "" {
r = bytes.NewReader(apmData.Data)
} else {
encoding = "gzip"
buf := c.bufferPool.Get().(*bytes.Buffer)
defer func() {
buf.Reset()
c.bufferPool.Put(buf)
}()
gw, err := gzip.NewWriterLevel(buf, gzip.BestSpeed)
if err != nil {
return err
}
if _, err := gw.Write(apmData.Data); err != nil {
return fmt.Errorf("failed to compress data: %w", err)
}
if err := gw.Close(); err != nil {
return fmt.Errorf("failed to write compressed data to buffer: %w", err)
}
r = buf
}
req, err := http.NewRequest(http.MethodPost, c.serverURL+endpointURI, r)
if err != nil {
return fmt.Errorf("failed to create a new request when posting to APM server: %v", err)
}
req.Header.Add("Content-Encoding", encoding)
req.Header.Add("Content-Type", "application/x-ndjson")
req.Header.Set("User-Agent", version.UserAgent+" "+agentInfo)
if c.ServerAPIKey != "" {
req.Header.Add("Authorization", "ApiKey "+c.ServerAPIKey)
} else if c.ServerSecretToken != "" {
req.Header.Add("Authorization", "Bearer "+c.ServerSecretToken)
}
c.logger.Debug("Sending data chunk to APM server")
resp, err := c.client.Do(req)
if err != nil {
c.UpdateStatus(ctx, Failing)
return fmt.Errorf("failed to post to APM server: %v", err)
}
defer resp.Body.Close()
// On success, the server will respond with a 202 Accepted status code and no body.
if resp.StatusCode == http.StatusAccepted {
c.UpdateStatus(ctx, Healthy)
return nil
}
// RateLimited
if resp.StatusCode == http.StatusTooManyRequests {
c.logger.Warnf("Transport has been rate limited: response status code: %d", resp.StatusCode)
c.UpdateStatus(ctx, RateLimited)
return nil
}
// Auth errors
if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden {
logBodyErrors(c.logger, resp)
c.UpdateStatus(ctx, Failing)
return nil
}
// ClientErrors
if resp.StatusCode >= 400 && resp.StatusCode < 500 {
logBodyErrors(c.logger, resp)
c.UpdateStatus(ctx, ClientFailing)
return nil
}
// critical errors
if resp.StatusCode == http.StatusInternalServerError || resp.StatusCode == http.StatusServiceUnavailable {
logBodyErrors(c.logger, resp)
c.UpdateStatus(ctx, Failing)
return nil
}
c.logger.Warnf("unhandled status code: %d", resp.StatusCode)
return nil
}