transport/http.go (546 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package transport // import "go.elastic.co/apm/v2/transport"
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"math/rand"
"mime/multipart"
"net/http"
"net/textproto"
"net/url"
"os"
"path"
"strconv"
"strings"
"sync/atomic"
"time"
"github.com/pkg/errors"
"go.elastic.co/apm/v2/apmconfig"
"go.elastic.co/apm/v2/internal/apmversion"
"go.elastic.co/apm/v2/internal/configutil"
)
const (
intakePath = "/intake/v2/events"
profilePath = "/intake/v2/profile"
configPath = "/config/v1/agents"
envAPIKey = "ELASTIC_APM_API_KEY"
envSecretToken = "ELASTIC_APM_SECRET_TOKEN"
envServerURLs = "ELASTIC_APM_SERVER_URLS"
envServerURL = "ELASTIC_APM_SERVER_URL"
envServerTimeout = "ELASTIC_APM_SERVER_TIMEOUT"
envServerCert = "ELASTIC_APM_SERVER_CERT"
envServerCACert = "ELASTIC_APM_SERVER_CA_CERT_FILE"
)
var (
// Take a copy of the http.DefaultTransport pointer,
// in case another package replaces the value later.
defaultHTTPTransport = http.DefaultTransport.(*http.Transport)
defaultServerURL, _ = url.Parse("http://localhost:8200")
defaultServerTimeout = 30 * time.Second
)
// HTTPTransportOptions for the HTTPTransport.
type HTTPTransportOptions struct {
// APIKey holds the base64-encoded API Key credential string, used for
// authenticating the agent. APIKey takes precedence over SecretToken.
//
// If unspecified, APIKey will be initialized using the
// ELASTIC_APM_API_KEY environment variable.
APIKey string
// SecretToken holds the secret token configured in the APM Server, used
// for authenticating the agent.
//
// If unspecified, SecretToken will be initialized using the
// ELASTIC_APM_SECRET_TOKEN environment variable.
SecretToken string
// ServerURLs holds the URLs for your Elastic APM Server. The Server
// supports both HTTP and HTTPS. If you use HTTPS, then you may need to
// configure your client machines so that the server certificate can be
// verified. You can disable certificate verification with SkipServerVerify.
//
// If no URLs are specified, then ServerURLs will be initialized using the
// ELASTIC_APM_SERVER_URL environment variable, defaulting to
// "http://localhost:8200" if the environment variable is not set.
ServerURLs []*url.URL
// ServerTimeout holds the timeout for requests made to your Elastic APM
// server.
//
// When set to zero, it will default to 30 seconds. Negative values
// are not allowed.
//
// If ServerTimeout is zero, then it will be initialized using the
// ELASTIC_APM_SERVER_TIMEOUT environment variable, defaulting to
// 30 seconds if the environment variable is not set. Negative values are
// not allowed, and will cause NewHTTPTransport to return an error.
ServerTimeout time.Duration
// TLSClientConfig holds client TLS configuration for use in the HTTP client.
//
// If TLS is nil, TLS will be constructed using the following environment
// variables:
//
// - ELASTIC_APM_SERVER_CERT: the path to a PEM-encoded TLS certificate
// that must match the APM Server-supplied certificate. This can be used
// to pin a self signed certificate.
//
// - ELASTIC_APM_SERVER_CA_CERT_FILE: the path to a PEM-encoded TLS
// Certificate Authority certificate that will be used for verifying
// the server's TLS certificate chain.
//
// - ELASTIC_APM_VERIFY_SERVER_CERT: flag to control verification of the
// APM Server's TLS certificates. If ELASTIC_APM_SERVER_CERT is defined,
// ELASTIC_APM_VERIFY_SERVER_CERT is ignored.
TLSClientConfig *tls.Config
// UserAgent holds the value to use for the User-Agent header.
//
// If unspecified, UserAgent will be set to the value returned by
// DefaultUserAgent().
UserAgent string
}
// Validate ensures the HTTPTransportOptions are valid.
func (opts HTTPTransportOptions) Validate() error {
if opts.ServerTimeout < 0 {
return errors.New("apm transport options: ServerTimeout must be greater or equal to 0")
}
return nil
}
// HTTPTransport is an implementation of Transport, sending payloads via
// a net/http client.
type HTTPTransport struct {
// Client exposes the http.Client used by the HTTPTransport for
// sending requests to the APM Server.
Client *http.Client
intakeHeaders http.Header
configHeaders http.Header
profileHeaders http.Header
rootHeaders http.Header
shuffleRand *rand.Rand
urlIndex int32
intakeURLs []*url.URL
configURLs []*url.URL
profileURLs []*url.URL
majorServerVersion uint32
}
// NewHTTPTransport returns a new HTTPTransport, initialized with opts,
// which can be used for streaming data to the APM Server.
func NewHTTPTransport(opts HTTPTransportOptions) (*HTTPTransport, error) {
if opts.APIKey == "" {
opts.APIKey = os.Getenv(envAPIKey)
}
if len(opts.ServerURLs) == 0 {
serverURLs, err := initServerURLs()
if err != nil {
return nil, err
}
opts.ServerURLs = serverURLs
}
if opts.TLSClientConfig == nil {
tlsClientConfig, err := newEnvTLSClientConfig()
if err != nil {
return nil, err
}
opts.TLSClientConfig = tlsClientConfig
}
if opts.SecretToken == "" && opts.APIKey == "" {
opts.SecretToken = os.Getenv(envSecretToken)
}
if opts.ServerTimeout == 0 {
serverTimeout, err := configutil.ParseDurationEnv(envServerTimeout, defaultServerTimeout)
if err != nil {
return nil, err
}
opts.ServerTimeout = serverTimeout
}
return newHTTPTransportOptions(opts)
}
func newHTTPTransportOptions(opts HTTPTransportOptions) (*HTTPTransport, error) {
if err := opts.Validate(); err != nil {
return nil, err
}
// If the ServerTimeout is unspecified, set it to defaultServerTimeout.
client := &http.Client{
Timeout: opts.ServerTimeout,
Transport: &http.Transport{
Proxy: defaultHTTPTransport.Proxy,
DialContext: defaultHTTPTransport.DialContext,
MaxIdleConns: defaultHTTPTransport.MaxIdleConns,
IdleConnTimeout: defaultHTTPTransport.IdleConnTimeout,
TLSHandshakeTimeout: defaultHTTPTransport.TLSHandshakeTimeout,
ExpectContinueTimeout: defaultHTTPTransport.ExpectContinueTimeout,
TLSClientConfig: opts.TLSClientConfig,
},
}
commonHeaders := make(http.Header)
if opts.UserAgent == "" {
opts.UserAgent = DefaultUserAgent()
}
commonHeaders.Set("User-Agent", opts.UserAgent)
intakeHeaders := copyHeaders(commonHeaders)
intakeHeaders.Set("Content-Type", "application/x-ndjson")
intakeHeaders.Set("Content-Encoding", "deflate")
intakeHeaders.Set("Transfer-Encoding", "chunked")
profileHeaders := copyHeaders(commonHeaders)
t := &HTTPTransport{
Client: client,
configHeaders: commonHeaders,
intakeHeaders: intakeHeaders,
profileHeaders: profileHeaders,
rootHeaders: copyHeaders(commonHeaders),
}
if opts.APIKey != "" {
t.SetAPIKey(opts.APIKey)
} else if opts.SecretToken != "" {
t.SetSecretToken(opts.SecretToken)
}
if len(opts.ServerURLs) == 0 {
opts.ServerURLs = []*url.URL{defaultServerURL}
}
if err := t.SetServerURL(opts.ServerURLs...); err != nil {
return nil, err
}
return t, nil
}
func newEnvTLSClientConfig() (*tls.Config, error) {
verifyServerCert, err := checkVerifyServerCert()
if err != nil {
return nil, err
}
tlsClientConfig := &tls.Config{InsecureSkipVerify: !verifyServerCert}
err = addCertPath(tlsClientConfig)
if err != nil {
return nil, err
}
return tlsClientConfig, nil
}
// SetServerURL sets the APM Server URL (or URLs) for sending requests.
// At least one URL must be specified, or the method will return an error.
// The list will be randomly shuffled.
func (t *HTTPTransport) SetServerURL(u ...*url.URL) error {
if len(u) == 0 {
return errors.New("SetServerURL expects at least one URL")
}
intakeURLs := make([]*url.URL, len(u))
configURLs := make([]*url.URL, len(u))
profileURLs := make([]*url.URL, len(u))
for i, u := range u {
intakeURLs[i] = urlWithPath(u, intakePath)
configURLs[i] = urlWithPath(u, configPath)
profileURLs[i] = urlWithPath(u, profilePath)
}
if n := len(intakeURLs); n > 0 {
if t.shuffleRand == nil {
t.shuffleRand = rand.New(rand.NewSource(time.Now().UnixNano()))
}
for i := n - 1; i > 0; i-- {
j := t.shuffleRand.Intn(i + 1)
intakeURLs[i], intakeURLs[j] = intakeURLs[j], intakeURLs[i]
configURLs[i], configURLs[j] = configURLs[j], configURLs[i]
profileURLs[i], profileURLs[j] = profileURLs[j], profileURLs[i]
}
}
t.intakeURLs = intakeURLs
t.configURLs = configURLs
t.profileURLs = profileURLs
t.urlIndex = 0
return nil
}
// SetUserAgent sets the User-Agent header that will be sent with each request.
func (t *HTTPTransport) SetUserAgent(ua string) {
t.setCommonHeader("User-Agent", ua)
}
// SetSecretToken sets the Authorization header with the given secret token.
//
// This overrides the value specified via the ELASTIC_APM_SECRET_TOKEN or
// ELASTIC_APM_API_KEY environment variables, if either are set.
func (t *HTTPTransport) SetSecretToken(secretToken string) {
if secretToken != "" {
t.setCommonHeader("Authorization", "Bearer "+secretToken)
} else {
t.deleteCommonHeader("Authorization")
}
}
// SetAPIKey sets the Authorization header with the given API Key.
//
// This overrides the value specified via the ELASTIC_APM_SECRET_TOKEN or
// ELASTIC_APM_API_KEY environment variables, if either are set.
func (t *HTTPTransport) SetAPIKey(apiKey string) {
if apiKey != "" {
t.setCommonHeader("Authorization", "ApiKey "+apiKey)
} else {
t.deleteCommonHeader("Authorization")
}
}
func (t *HTTPTransport) setCommonHeader(key, value string) {
t.configHeaders.Set(key, value)
t.rootHeaders.Set(key, value)
t.intakeHeaders.Set(key, value)
t.profileHeaders.Set(key, value)
}
func (t *HTTPTransport) deleteCommonHeader(key string) {
t.configHeaders.Del(key)
t.rootHeaders.Del(key)
t.intakeHeaders.Del(key)
t.profileHeaders.Del(key)
}
// SendStream sends the stream over HTTP. If SendStream returns an error and
// the transport is configured with more than one APM Server URL, then the
// following request will be sent to the next URL in the list.
func (t *HTTPTransport) SendStream(ctx context.Context, r io.Reader) error {
urlIndex := atomic.LoadInt32(&t.urlIndex)
intakeURL := t.intakeURLs[urlIndex]
req := t.newRequest("POST", intakeURL)
req = requestWithContext(ctx, req)
req.Header = t.intakeHeaders
req.Body = ioutil.NopCloser(r)
if err := t.sendStreamRequest(req); err != nil {
atomic.StoreInt32(&t.urlIndex, (urlIndex+1)%int32(len(t.intakeURLs)))
// The remote APM Server url has changed, so we invalidate the local
// Major Server version cache.
atomic.StoreUint32(&t.majorServerVersion, 0)
return err
}
return nil
}
func (t *HTTPTransport) sendStreamRequest(req *http.Request) error {
resp, err := t.Client.Do(req)
if err != nil {
return errors.Wrap(err, "sending event request failed")
}
defer resp.Body.Close()
switch resp.StatusCode {
case http.StatusOK, http.StatusAccepted:
return nil
}
result := newHTTPError(resp)
if resp.StatusCode == http.StatusNotFound && result.Message == "404 page not found" {
// This may be an old (pre-6.5) APM server
// that does not support the v2 intake API.
result.Message = fmt.Sprintf("%s not found (requires APM Server 6.5.0 or newer)", req.URL)
}
return result
}
// SendProfile sends a symbolised pprof profile, encoded as protobuf, and gzip-compressed.
//
// NOTE this is an experimental API, and may be removed in a future minor version, without
// being considered a breaking change.
func (t *HTTPTransport) SendProfile(
ctx context.Context,
metadataReader io.Reader,
profileReaders ...io.Reader,
) error {
urlIndex := atomic.LoadInt32(&t.urlIndex)
profileURL := t.profileURLs[urlIndex]
req := t.newRequest("POST", profileURL)
req = requestWithContext(ctx, req)
req.Header = t.profileHeaders
writeBody := func(w *multipart.Writer) error {
h := make(textproto.MIMEHeader)
h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="metadata"`))
h.Set("Content-Type", "application/json")
part, err := w.CreatePart(h)
if err != nil {
return err
}
if _, err := io.Copy(part, metadataReader); err != nil {
return err
}
for _, profileReader := range profileReaders {
h = make(textproto.MIMEHeader)
h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="profile"`))
h.Set("Content-Type", `application/x-protobuf; messageType="perftools.profiles.Profile"`)
part, err = w.CreatePart(h)
if err != nil {
return err
}
if _, err := io.Copy(part, profileReader); err != nil {
return err
}
}
return w.Close()
}
pipeR, pipeW := io.Pipe()
mpw := multipart.NewWriter(pipeW)
req.Header.Set("Content-Type", mpw.FormDataContentType())
req.Body = pipeR
go func() {
err := writeBody(mpw)
pipeW.CloseWithError(err)
}()
return t.sendProfileRequest(req)
}
func (t *HTTPTransport) sendProfileRequest(req *http.Request) error {
resp, err := t.Client.Do(req)
if err != nil {
return errors.Wrap(err, "sending profile request failed")
}
switch resp.StatusCode {
case http.StatusOK, http.StatusAccepted:
resp.Body.Close()
return nil
}
defer resp.Body.Close()
result := newHTTPError(resp)
if resp.StatusCode == http.StatusNotFound && result.Message == "404 page not found" {
// TODO(axw) correct minimum server version.
result.Message = fmt.Sprintf("%s not found (requires APM Server 7.5.0 or newer)", req.URL)
}
return result
}
// WatchConfig polls the APM Server for agent config changes, sending
// them over the returned channel.
func (t *HTTPTransport) WatchConfig(ctx context.Context, args apmconfig.WatchParams) <-chan apmconfig.Change {
changes := make(chan apmconfig.Change)
go func() {
defer close(changes)
var etag string
var out chan apmconfig.Change
var change apmconfig.Change
timer := time.NewTimer(0)
for {
select {
case <-ctx.Done():
return
case out <- change:
out = nil
change = apmconfig.Change{}
continue
case <-timer.C:
}
urlIndex := atomic.LoadInt32(&t.urlIndex)
query := make(url.Values)
query.Set("service.name", args.Service.Name)
if args.Service.Environment != "" {
query.Set("service.environment", args.Service.Environment)
}
url := *t.configURLs[urlIndex]
url.RawQuery = query.Encode()
req := t.newRequest("GET", &url)
req.Header = t.configHeaders
if etag != "" {
req.Header = copyHeaders(req.Header)
req.Header.Set("If-None-Match", strconv.QuoteToASCII(etag))
}
req = requestWithContext(ctx, req)
resp := t.configRequest(req)
var send bool
if resp.err != nil {
// The request will have failed if the context has been
// cancelled. No need to send a a change in this case.
send = ctx.Err() == nil
}
if !send && resp.attrs != nil {
etag = resp.etag
send = true
}
if send {
change = apmconfig.Change{Err: resp.err, Attrs: resp.attrs}
out = changes
}
timer.Reset(resp.maxAge)
}
}()
return changes
}
func (t *HTTPTransport) configRequest(req *http.Request) configResponse {
// defaultMaxAge is the default amount of time to wait between
// requests. This should only be used when the server does not
// respond with a Cache-Control header, or where the header is
// malformed.
const defaultMaxAge = 5 * time.Minute
resp, err := t.Client.Do(req)
if err != nil {
// TODO(axw) this might indicate that the APM Server is unavailable.
// In this case, we should allow a change in URL due to SendStream
// to cut the defaultMaxAge delay short.
return configResponse{
err: errors.Wrap(err, "sending config request failed"),
maxAge: defaultMaxAge,
}
}
defer resp.Body.Close()
var response configResponse
if etag, err := strconv.Unquote(resp.Header.Get("Etag")); err == nil {
response.etag = etag
}
cacheControl := parseCacheControl(resp.Header.Get("Cache-Control"))
response.maxAge = cacheControl.maxAge
if response.maxAge < 0 {
response.maxAge = defaultMaxAge
}
switch resp.StatusCode {
case http.StatusNotModified, http.StatusForbidden, http.StatusNotFound:
// 304 (Not Modified) is returned when the config has not changed since the previous query.
// 403 (Forbidden) is returned if the server does not have the connection to Kibana enabled.
// 404 (Not Found) is returned by old servers that do not implement the config endpoint.
return response
case http.StatusOK:
attrs := make(map[string]string)
// TODO(axw) handling EOF shouldn't be necessary, server currently responds with an empty
// body when there is no config.
if err := json.NewDecoder(resp.Body).Decode(&attrs); err != nil && err != io.EOF {
response.err = err
} else {
response.attrs = attrs
}
return response
}
response.err = newHTTPError(resp)
if response.maxAge < 5*time.Second {
response.maxAge = 5 * time.Second
}
return response
}
// serverInfo represents the APM Server information as exposed in the `/`
// endpoint. Not all fields may be modeled in this structure.
type serverInfo struct {
// Version holds the APM Server version.
Version string `json:"version,omitempty"`
}
// MajorServerVersion returns the APM Server's major version. When refreshStale
// is true` it will request the remote APM Server's version from `/`, otherwise
// it will return the cached version. If the returned first argument is 0, the
// cache is stale.
func (t *HTTPTransport) MajorServerVersion(ctx context.Context, refreshStale bool) uint32 {
if v := atomic.LoadUint32(&t.majorServerVersion); v > 0 || !refreshStale {
return v
}
return t.refreshMajorServerVersion(ctx)
}
// RefreshVersion queries the "active" remote APM Server and caches the result
// locally when the operation succeeds.
func (t *HTTPTransport) refreshMajorServerVersion(ctx context.Context) uint32 {
srvURL := t.intakeURLs[atomic.LoadInt32(&t.urlIndex)]
u := *srvURL
u.Path, u.RawPath = "", ""
req := requestWithContext(ctx, t.newRequest("GET", urlWithPath(&u, "/")))
req.Header = t.rootHeaders
res, err := t.Client.Do(req)
if err != nil {
return 0
}
defer res.Body.Close()
var resp serverInfo
if err := json.NewDecoder(res.Body).Decode(&resp); err != nil {
return 0
}
if resp.Version != "" {
if v, ok := parseMajorVersion(resp.Version); ok {
atomic.StoreUint32(&t.majorServerVersion, v)
return v
}
}
return atomic.LoadUint32(&t.majorServerVersion)
}
func (t *HTTPTransport) newRequest(method string, url *url.URL) *http.Request {
req := &http.Request{
Method: method,
URL: url,
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
Host: url.Host,
}
return req
}
func urlWithPath(url *url.URL, p string) *url.URL {
urlCopy := *url
urlCopy.Path = path.Clean(urlCopy.Path + p)
if urlCopy.RawPath != "" {
urlCopy.RawPath = path.Clean(urlCopy.RawPath + p)
}
return &urlCopy
}
// HTTPError is an error returned by HTTPTransport methods when requests fail.
type HTTPError struct {
Response *http.Response
Message string
}
func newHTTPError(resp *http.Response) *HTTPError {
bodyContents, err := ioutil.ReadAll(resp.Body)
if err == nil {
resp.Body = ioutil.NopCloser(bytes.NewReader(bodyContents))
}
return &HTTPError{
Response: resp,
Message: strings.TrimSpace(string(bodyContents)),
}
}
func (e *HTTPError) Error() string {
msg := fmt.Sprintf("request failed with %s", e.Response.Status)
if e.Message != "" {
msg += ": " + e.Message
}
return msg
}
// initServerURLs parses ELASTIC_APM_SERVER_URLS if specified,
// otherwise parses ELASTIC_APM_SERVER_URL if specified. If
// neither are specified, then the default localhost URL is
// returned.
func initServerURLs() ([]*url.URL, error) {
key := envServerURLs
value := os.Getenv(key)
if value == "" {
key = envServerURL
value = os.Getenv(key)
}
var urls []*url.URL
for _, field := range strings.Split(value, ",") {
field = strings.TrimSpace(field)
if field == "" {
continue
}
u, err := url.Parse(field)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse %s", key)
}
urls = append(urls, u)
}
if len(urls) == 0 {
urls = []*url.URL{defaultServerURL}
}
return urls, nil
}
func requestWithContext(ctx context.Context, req *http.Request) *http.Request {
url := req.URL
req.URL = nil
reqCopy := req.WithContext(ctx)
reqCopy.URL = url
req.URL = url
return reqCopy
}
// DefaultUserAgent returns the default value to use for the User-Agent header:
// apm-agent-go/<agent-version>.
func DefaultUserAgent() string {
return fmt.Sprintf("apm-agent-go/%s", apmversion.AgentVersion)
}
func copyHeaders(in http.Header) http.Header {
out := make(http.Header, len(in))
for k, vs := range in {
vsCopy := make([]string, len(vs))
copy(vsCopy, vs)
out[k] = vsCopy
}
return out
}
type configResponse struct {
err error
attrs map[string]string
etag string
maxAge time.Duration
}
type cacheControl struct {
maxAge time.Duration
}
func parseCacheControl(s string) cacheControl {
fields := strings.SplitN(s, "max-age=", 2)
if len(fields) < 2 {
return cacheControl{maxAge: -1}
}
s = fields[1]
if i := strings.IndexRune(s, ','); i != -1 {
s = s[:i]
}
maxAge, err := strconv.ParseUint(s, 10, 32)
if err != nil {
return cacheControl{maxAge: -1}
}
return cacheControl{maxAge: time.Duration(maxAge) * time.Second}
}
// parseMajorVersion returns the major version given a version string. Accepts
// the string as long as it contains a `.` and the runes preceding `.` can be
// parsed to a number. If the operation succeeded, the second return value will
// be true.
func parseMajorVersion(v string) (uint32, bool) {
i := strings.IndexRune(v, '.')
if i == -1 {
return 0, false
}
major, err := strconv.Atoi(v[:i])
if err != nil {
return 0, false
}
return uint32(major), true
}