func()

in apmproxy/apmserver.go [132:223]


func (c *Client) PostToApmServer(ctx context.Context, apmData accumulator.APMData) error {
	// todo: can this be a streaming or streaming style call that keeps the
	//       connection open across invocations?
	if c.IsUnhealthy() {
		return errors.New("transport status is unhealthy")
	}

	endpointURI := "intake/v2/events"
	encoding := apmData.ContentEncoding
	agentInfo := apmData.AgentInfo

	var r io.Reader
	if apmData.ContentEncoding != "" {
		r = bytes.NewReader(apmData.Data)
	} else {
		encoding = "gzip"
		buf := c.bufferPool.Get().(*bytes.Buffer)
		defer func() {
			buf.Reset()
			c.bufferPool.Put(buf)
		}()
		gw, err := gzip.NewWriterLevel(buf, gzip.BestSpeed)
		if err != nil {
			return err
		}
		if _, err := gw.Write(apmData.Data); err != nil {
			return fmt.Errorf("failed to compress data: %w", err)
		}
		if err := gw.Close(); err != nil {
			return fmt.Errorf("failed to write compressed data to buffer: %w", err)
		}
		r = buf
	}

	req, err := http.NewRequest(http.MethodPost, c.serverURL+endpointURI, r)
	if err != nil {
		return fmt.Errorf("failed to create a new request when posting to APM server: %v", err)
	}
	req.Header.Add("Content-Encoding", encoding)
	req.Header.Add("Content-Type", "application/x-ndjson")
	req.Header.Set("User-Agent", version.UserAgent+" "+agentInfo)
	if c.ServerAPIKey != "" {
		req.Header.Add("Authorization", "ApiKey "+c.ServerAPIKey)
	} else if c.ServerSecretToken != "" {
		req.Header.Add("Authorization", "Bearer "+c.ServerSecretToken)
	}

	c.logger.Debug("Sending data chunk to APM server")
	resp, err := c.client.Do(req)
	if err != nil {
		c.UpdateStatus(ctx, Failing)
		return fmt.Errorf("failed to post to APM server: %v", err)
	}
	defer resp.Body.Close()

	// On success, the server will respond with a 202 Accepted status code and no body.
	if resp.StatusCode == http.StatusAccepted {
		c.UpdateStatus(ctx, Healthy)
		return nil
	}

	// RateLimited
	if resp.StatusCode == http.StatusTooManyRequests {
		c.logger.Warnf("Transport has been rate limited: response status code: %d", resp.StatusCode)
		c.UpdateStatus(ctx, RateLimited)
		return nil
	}

	// Auth errors
	if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden {
		logBodyErrors(c.logger, resp)
		c.UpdateStatus(ctx, Failing)
		return nil
	}

	// ClientErrors
	if resp.StatusCode >= 400 && resp.StatusCode < 500 {
		logBodyErrors(c.logger, resp)
		c.UpdateStatus(ctx, ClientFailing)
		return nil
	}

	// critical errors
	if resp.StatusCode == http.StatusInternalServerError || resp.StatusCode == http.StatusServiceUnavailable {
		logBodyErrors(c.logger, resp)
		c.UpdateStatus(ctx, Failing)
		return nil
	}

	c.logger.Warnf("unhandled status code: %d", resp.StatusCode)
	return nil
}