internal/beater/http.go (127 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package beater import ( "context" "log" "net" "net/http" "net/url" "strings" "github.com/libp2p/go-reuseport" "go.uber.org/zap" "golang.org/x/net/netutil" "github.com/elastic/apm-server/internal/beater/api" "github.com/elastic/apm-server/internal/beater/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/transport/tlscommon" "github.com/elastic/gmux" ) type httpServer struct { *http.Server cfg *config.Config logger *logp.Logger grpcListener net.Listener httpListener net.Listener } func newHTTPServer( logger *logp.Logger, cfg *config.Config, handler http.Handler, listener net.Listener, ) (*httpServer, error) { server := &http.Server{ Addr: cfg.Host, Handler: handler, IdleTimeout: cfg.IdleTimeout, ReadTimeout: cfg.ReadTimeout, WriteTimeout: cfg.WriteTimeout, MaxHeaderBytes: cfg.MaxHeaderSize, ErrorLog: newErrorLog(logger), } if cfg.TLS.IsEnabled() { tlsServerConfig, err := tlscommon.LoadTLSServerConfig(cfg.TLS) if err != nil { return nil, err } server.TLSConfig = tlsServerConfig.BuildServerConfig("") } // Configure the server with gmux. The returned net.Listener will receive // gRPC connections, while all other requests will be handled by s.Handler. // // grpcListener is closed when the HTTP server is shutdown. grpcListener, err := gmux.ConfigureServer(server, nil) if err != nil { return nil, err } return &httpServer{server, cfg, logger, grpcListener, listener}, nil } func (h *httpServer) start() error { if h.cfg.RumConfig.Enabled { h.logger.Info("RUM endpoints enabled!") for _, s := range h.cfg.RumConfig.AllowOrigins { if s == "*" { h.logger.Warn("CORS related setting `apm-server.rum.allow_origins` allows all origins. Consider more restrictive setting for production use.") break } } } else { h.logger.Info("RUM endpoints disabled.") } if h.cfg.TLS.IsEnabled() { h.logger.Info("SSL enabled.") return h.ServeTLS(h.httpListener, "", "") } if h.cfg.AgentAuth.SecretToken != "" { h.logger.Warn("Secret token is set, but SSL is not enabled.") } h.logger.Info("SSL disabled.") return h.Serve(h.httpListener) } func (h *httpServer) stop(ctx context.Context) { h.logger.Infof("Stop listening on: %s", h.Server.Addr) if err := h.Shutdown(ctx); err != nil { h.logger.Errorf("error stopping http server: %s", err.Error()) if err := h.Close(); err != nil { h.logger.Errorf("error closing http server: %s", err.Error()) } } } // listen starts the listener for bt.config.Host. func listen(cfg *config.Config, logger *logp.Logger) (net.Listener, error) { var listener net.Listener url, err := url.Parse(cfg.Host) if err == nil && url.Scheme == "unix" { // SO_REUSEPORT does not support unix sockets listener, err = net.Listen("unix", url.Path) } else { addr := cfg.Host if _, _, err := net.SplitHostPort(addr); err != nil { // Tack on a port if SplitHostPort fails on what should be a // tcp network address. If splitting failed because there were // already too many colons, one more won't change that. addr = net.JoinHostPort(addr, config.DefaultPort) } listener, err = reuseport.Listen("tcp", addr) } if err != nil { return nil, err } addr := listener.Addr() if network := addr.Network(); network == "tcp" { logger.Infof("Listening on: %s", addr) } else { logger.Infof("Listening on: %s:%s", network, addr.String()) } if cfg.MaxConnections > 0 { logger.Infof("Connection limit set to: %d", cfg.MaxConnections) listener = netutil.LimitListener(listener, cfg.MaxConnections) } return listener, nil } func doNotTrace(req *http.Request) bool { // Don't trace root url (healthcheck) requests. return req.URL.Path == api.RootPath } // newErrorLog returns a standard library log.Logger that sends // logs to logger with error level. func newErrorLog(logger *logp.Logger) *log.Logger { logger = logger.Named("http") logger = logger.WithOptions(zap.AddCallerSkip(3)) w := errorLogWriter{logger} return log.New(w, "", 0) } type errorLogWriter struct { logger *logp.Logger } func (w errorLogWriter) Write(p []byte) (int, error) { message := strings.TrimSpace(string(p)) w.logger.Error(message) return len(p), nil }