apmproxy/receiver.go (139 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package apmproxy
import (
"context"
"errors"
"fmt"
"io"
"net"
"net/http"
"net/http/httputil"
"net/url"
"time"
"github.com/elastic/apm-aws-lambda/accumulator"
"github.com/elastic/apm-aws-lambda/version"
)
const txnRegistrationContentType = "application/vnd.elastic.apm.transaction+ndjson"
// StartReceiver starts the server listening for APM agent data.
func (c *Client) StartReceiver() error {
mux := http.NewServeMux()
handleInfoRequest, err := c.handleInfoRequest()
if err != nil {
return err
}
mux.HandleFunc("/", handleInfoRequest)
mux.HandleFunc("/intake/v2/events", c.handleIntakeV2Events())
mux.HandleFunc("/register/transaction", c.handleTransactionRegistration())
c.receiver.Handler = mux
ln, err := net.Listen("tcp", c.receiver.Addr)
if err != nil {
return fmt.Errorf("failed to listen on addr %s", c.receiver.Addr)
}
go func() {
c.logger.Infof("Extension listening for apm data on %s", c.receiver.Addr)
if err = c.receiver.Serve(ln); err != nil && !errors.Is(err, http.ErrServerClosed) {
c.logger.Errorf("received error from http.Serve(): %v", err)
} else {
c.logger.Debug("server closed")
}
}()
return nil
}
// Shutdown shutdowns the apm receiver gracefully.
func (c *Client) Shutdown() error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return c.receiver.Shutdown(ctx)
}
// URL: http://server/
func (c *Client) handleInfoRequest() (func(w http.ResponseWriter, r *http.Request), error) {
// Init reverse proxy
parsedApmServerURL, err := url.Parse(c.serverURL)
if err != nil {
return nil, fmt.Errorf("could not parse APM server URL: %w", err)
}
reverseProxy := httputil.NewSingleHostReverseProxy(parsedApmServerURL)
reverseProxy.Transport = c.client.Transport.(*http.Transport).Clone()
reverseProxy.ErrorHandler = func(w http.ResponseWriter, _ *http.Request, err error) {
// Don't update the status of the transport as it is possible that the extension
// is frozen while processing the request and context is canceled due to timeout.
c.logger.Errorf("Error querying version from the APM server: %v", err)
// Server is unreachable, return StatusBadGateway (default behavior) to avoid
// returning a Status OK.
w.WriteHeader(http.StatusBadGateway)
}
return func(w http.ResponseWriter, r *http.Request) {
c.logger.Debug("Handling APM server Info Request")
// Process request (the Golang doc suggests removing any pre-existing X-Forwarded-For header coming
// from the client or an untrusted proxy to prevent IP spoofing : https://pkg.go.dev/net/http/httputil#ReverseProxy
r.Header.Del("X-Forwarded-For")
// Update headers to allow for SSL redirection
r.URL.Host = parsedApmServerURL.Host
r.URL.Scheme = parsedApmServerURL.Scheme
r.Header.Set("X-Forwarded-Host", r.Header.Get("Host"))
reqAgent := r.UserAgent()
r.Header.Set("User-Agent", version.UserAgent+" "+reqAgent)
r.Host = parsedApmServerURL.Host
// Override authorization header sent by the APM agents
if c.ServerAPIKey != "" {
r.Header.Add("Authorization", "ApiKey "+c.ServerAPIKey)
} else if c.ServerSecretToken != "" {
r.Header.Add("Authorization", "Bearer "+c.ServerSecretToken)
}
// Forward request to the APM server
reverseProxy.ServeHTTP(w, r)
}, nil
}
// URL: http://server/intake/v2/events
func (c *Client) handleIntakeV2Events() func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
c.logger.Debug("Handling APM Data Intake")
rawBytes, err := io.ReadAll(r.Body)
defer r.Body.Close()
if err != nil {
c.logger.Errorf("Could not read agent intake request body: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
agentFlushed := r.URL.Query().Get("flushed") == "true"
if len(rawBytes) != 0 {
agentData := accumulator.APMData{
Data: rawBytes,
ContentEncoding: r.Header.Get("Content-Encoding"),
AgentInfo: r.UserAgent(),
}
select {
case c.AgentDataChannel <- agentData:
default:
c.logger.Warnf("Channel full: dropping a subset of agent data")
}
} else {
c.logger.Debugf("Received empy request from '%s'", r.UserAgent())
}
if agentFlushed {
c.flushMutex.Lock()
select {
case <-c.flushCh:
// the channel is closed.
// the extension received at least a flush request already but the
// data have not been flushed yet.
// We can reuse the closed channel.
default:
// no pending flush requests
// close the channel to signal a flush request has
// been received.
close(c.flushCh)
}
c.flushMutex.Unlock()
}
w.WriteHeader(http.StatusAccepted)
if _, err = w.Write([]byte("ok")); err != nil {
c.logger.Errorf("Failed to send intake response to APM agent : %v", err)
}
}
}
// URL: http://server/register/transaction
func (c *Client) handleTransactionRegistration() func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
if r.Header.Get("Content-Type") != txnRegistrationContentType {
w.WriteHeader(http.StatusUnsupportedMediaType)
return
}
reqID := r.Header.Get("x-elastic-aws-request-id")
if reqID == "" {
w.WriteHeader(http.StatusBadRequest)
return
}
rawBytes, err := io.ReadAll(r.Body)
defer r.Body.Close()
if err != nil {
c.logger.Warnf("Failed to read transaction registration body: %v", err)
w.WriteHeader(http.StatusBadRequest)
return
}
if err := c.batch.OnAgentInit(
reqID, r.Header.Get("Content-Encoding"), rawBytes,
); err != nil {
c.logger.Warnf("Failed to update invocation: %v", err)
w.WriteHeader(http.StatusUnprocessableEntity)
return
}
}
}