internal/beater/api/mux.go (291 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package api import ( "net/http" httppprof "net/http/pprof" "regexp" "runtime/pprof" "github.com/gorilla/mux" "github.com/pkg/errors" "go.opentelemetry.io/otel/metric" "go.uber.org/zap" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/apm-data/input" "github.com/elastic/apm-data/input/elasticapm" "github.com/elastic/apm-data/model/modelpb" "github.com/elastic/apm-data/model/modelprocessor" "github.com/elastic/apm-server/internal/agentcfg" "github.com/elastic/apm-server/internal/beater/api/config/agent" "github.com/elastic/apm-server/internal/beater/api/intake" "github.com/elastic/apm-server/internal/beater/api/root" "github.com/elastic/apm-server/internal/beater/auth" "github.com/elastic/apm-server/internal/beater/config" "github.com/elastic/apm-server/internal/beater/middleware" "github.com/elastic/apm-server/internal/beater/otlp" "github.com/elastic/apm-server/internal/beater/ratelimit" "github.com/elastic/apm-server/internal/beater/request" "github.com/elastic/apm-server/internal/logs" srvmodelprocessor "github.com/elastic/apm-server/internal/model/modelprocessor" "github.com/elastic/apm-server/internal/sourcemap" "github.com/elastic/apm-server/internal/version" ) const ( // RootPath defines the server's root path RootPath = "/" // Backend routes // AgentConfigPath defines the path to query for agent config management AgentConfigPath = "/config/v1/agents" // IntakePath defines the path to ingest monitored events IntakePath = "/intake/v2/events" // RUM routes // AgentConfigRUMPath defines the path to query for the RUM agent config management AgentConfigRUMPath = "/config/v1/rum/agents" // IntakeRUMPath defines the path to ingest monitored RUM events IntakeRUMPath = "/intake/v2/rum/events" IntakeRUMV3Path = "/intake/v3/rum/events" // OTLPTracesIntakePath defines the path to ingest OpenTelemetry traces (HTTP Collector) OTLPTracesIntakePath = "/v1/traces" // OTLPMetricsIntakePath defines the path to ingest OpenTelemetry metrics (HTTP Collector) OTLPMetricsIntakePath = "/v1/metrics" // OTLPLogsIntakePath defines the path to ingest OpenTelemetry logs (HTTP Collector) OTLPLogsIntakePath = "/v1/logs" ) // NewMux creates a new gorilla/mux router, with routes registered for handling the // APM Server API. func NewMux( beaterConfig *config.Config, batchProcessor modelpb.BatchProcessor, authenticator *auth.Authenticator, fetcher agentcfg.Fetcher, ratelimitStore *ratelimit.Store, sourcemapFetcher sourcemap.Fetcher, publishReady func() bool, semaphore input.Semaphore, meterProvider metric.MeterProvider, logger *logp.Logger, ) (*mux.Router, error) { pool := request.NewContextPool() logger = logger.Named(logs.Handler) router := mux.NewRouter() router.NotFoundHandler = pool.HTTPHandler(notFoundHandler) builder := routeBuilder{ cfg: beaterConfig, authenticator: authenticator, batchProcessor: batchProcessor, ratelimitStore: ratelimitStore, sourcemapFetcher: sourcemapFetcher, intakeSemaphore: semaphore, logger: logger, } zapLogger := zap.New(logger.Core(), zap.WithCaller(true)) builder.intakeProcessor = elasticapm.NewProcessor(elasticapm.Config{ MaxEventSize: beaterConfig.MaxEventSize, Semaphore: semaphore, Logger: zapLogger, }) type route struct { path string handlerFn func() (request.Handler, error) } otlpHandlers := otlp.NewHTTPHandlers(zapLogger, batchProcessor, semaphore, meterProvider) rumIntakeHandler := builder.rumIntakeHandler(meterProvider) routeMap := []route{ {RootPath, builder.rootHandler(publishReady, meterProvider)}, {AgentConfigPath, builder.backendAgentConfigHandler(fetcher, meterProvider)}, {AgentConfigRUMPath, builder.rumAgentConfigHandler(fetcher, meterProvider)}, {IntakeRUMPath, rumIntakeHandler}, {IntakeRUMV3Path, rumIntakeHandler}, {IntakePath, builder.backendIntakeHandler("apm-server.server.", meterProvider)}, {OTLPTracesIntakePath, builder.otlpHandler(otlpHandlers.HandleTraces, "apm-server.otlp.http.traces.", meterProvider)}, {OTLPMetricsIntakePath, builder.otlpHandler(otlpHandlers.HandleMetrics, "apm-server.otlp.http.metrics.", meterProvider)}, {OTLPLogsIntakePath, builder.otlpHandler(otlpHandlers.HandleLogs, "apm-server.otlp.http.logs.", meterProvider)}, } for _, route := range routeMap { h, err := route.handlerFn() if err != nil { return nil, err } logger.Infof("Path %s added to request handler", route.path) router.Handle(route.path, pool.HTTPHandler(h)) } if beaterConfig.Expvar.Enabled { path := beaterConfig.Expvar.URL logger.Infof("Path %s added to request handler", path) router.Handle(path, http.HandlerFunc(debugVarsHandler)) } if beaterConfig.Pprof.Enabled { const path = "/debug/pprof" logger.Infof("Path %s added to request handler", path) pprofRouter := router.PathPrefix(path).Subrouter().StrictSlash(true) pprofRouter.Handle("/", http.HandlerFunc(httppprof.Index)) for _, p := range pprof.Profiles() { pprofRouter.Handle("/"+p.Name(), http.HandlerFunc(httppprof.Index)) } pprofRouter.Handle("/cmdline", http.HandlerFunc(httppprof.Cmdline)) pprofRouter.Handle("/profile", http.HandlerFunc(httppprof.Profile)) pprofRouter.Handle("/symbol", http.HandlerFunc(httppprof.Symbol)) pprofRouter.Handle("/trace", http.HandlerFunc(httppprof.Trace)) } return router, nil } type routeBuilder struct { cfg *config.Config authenticator *auth.Authenticator batchProcessor modelpb.BatchProcessor ratelimitStore *ratelimit.Store sourcemapFetcher sourcemap.Fetcher intakeProcessor *elasticapm.Processor intakeSemaphore input.Semaphore logger *logp.Logger } func (r *routeBuilder) backendIntakeHandler(metricsPrefix string, mp metric.MeterProvider) func() (request.Handler, error) { return func() (request.Handler, error) { h := intake.Handler(mp, r.intakeProcessor, backendRequestMetadataFunc(r.cfg), r.batchProcessor) return middleware.Wrap(h, backendMiddleware(r.cfg, r.authenticator, r.ratelimitStore, metricsPrefix, mp, r.logger)...) } } func (r *routeBuilder) otlpHandler(handler http.HandlerFunc, metricsPrefix string, mp metric.MeterProvider) func() (request.Handler, error) { return func() (request.Handler, error) { h := func(c *request.Context) { handler(c.ResponseWriter, c.Request) } return middleware.Wrap(h, backendMiddleware(r.cfg, r.authenticator, r.ratelimitStore, metricsPrefix, mp, r.logger)...) } } func (r *routeBuilder) rumIntakeHandler(mp metric.MeterProvider) func() (request.Handler, error) { return func() (request.Handler, error) { var batchProcessors modelprocessor.Chained // The order of these processors is important. Source mapping must happen before identifying library frames, or // frames to exclude from error grouping; identifying library frames must happen before updating the error culprit. if r.sourcemapFetcher != nil { batchProcessors = append(batchProcessors, sourcemap.BatchProcessor{ Fetcher: r.sourcemapFetcher, Timeout: r.cfg.RumConfig.SourceMapping.Timeout, Logger: r.logger.Named(logs.Stacktrace), }) } if r.cfg.RumConfig.LibraryPattern != "" { re, err := regexp.Compile(r.cfg.RumConfig.LibraryPattern) if err != nil { return nil, errors.Wrap(err, "invalid library pattern regex") } batchProcessors = append(batchProcessors, srvmodelprocessor.SetLibraryFrame{Pattern: re}) } if r.cfg.RumConfig.ExcludeFromGrouping != "" { re, err := regexp.Compile(r.cfg.RumConfig.ExcludeFromGrouping) if err != nil { return nil, errors.Wrap(err, "invalid exclude from grouping regex") } batchProcessors = append(batchProcessors, srvmodelprocessor.SetExcludeFromGrouping{Pattern: re}) } if r.sourcemapFetcher != nil { batchProcessors = append(batchProcessors, modelprocessor.SetCulprit{}) } batchProcessors = append(batchProcessors, r.batchProcessor) // r.batchProcessor always goes last h := intake.Handler(mp, r.intakeProcessor, rumRequestMetadataFunc(r.cfg), batchProcessors) return middleware.Wrap(h, rumMiddleware(r.cfg, r.authenticator, r.ratelimitStore, "apm-server.server.", mp, r.logger)...) } } func (r *routeBuilder) rootHandler(publishReady func() bool, mp metric.MeterProvider) func() (request.Handler, error) { return func() (request.Handler, error) { h := root.Handler(root.HandlerConfig{ Version: version.VersionWithQualifier(), PublishReady: publishReady, }) return middleware.Wrap(h, rootMiddleware(r.cfg, r.authenticator, mp, r.logger)...) } } func (r *routeBuilder) backendAgentConfigHandler(f agentcfg.Fetcher, mp metric.MeterProvider) func() (request.Handler, error) { return func() (request.Handler, error) { return agentConfigHandler(r.cfg, r.authenticator, r.ratelimitStore, backendMiddleware, f, mp, r.logger) } } func (r *routeBuilder) rumAgentConfigHandler(f agentcfg.Fetcher, mp metric.MeterProvider) func() (request.Handler, error) { return func() (request.Handler, error) { return agentConfigHandler(r.cfg, r.authenticator, r.ratelimitStore, rumMiddleware, f, mp, r.logger) } } type middlewareFunc func(*config.Config, *auth.Authenticator, *ratelimit.Store, string, metric.MeterProvider, *logp.Logger) []middleware.Middleware func agentConfigHandler( cfg *config.Config, authenticator *auth.Authenticator, ratelimitStore *ratelimit.Store, middlewareFunc middlewareFunc, f agentcfg.Fetcher, mp metric.MeterProvider, logger *logp.Logger, ) (request.Handler, error) { mw := middlewareFunc(cfg, authenticator, ratelimitStore, "apm-server.acm.", mp, logger) h := agent.NewHandler(f, cfg.AgentConfig.Cache.Expiration, cfg.DefaultServiceEnvironment, cfg.AgentAuth.Anonymous.AllowAgent) return middleware.Wrap(h, mw...) } func apmMiddleware(mp metric.MeterProvider, metricsPrefix string, logger *logp.Logger) []middleware.Middleware { return []middleware.Middleware{ middleware.LogMiddleware(logger), middleware.RecoverPanicMiddleware(), middleware.MonitoringMiddleware(metricsPrefix, mp), } } func backendMiddleware(cfg *config.Config, authenticator *auth.Authenticator, ratelimitStore *ratelimit.Store, metricsPrefix string, mp metric.MeterProvider, logger *logp.Logger) []middleware.Middleware { backendMiddleware := append(apmMiddleware(mp, metricsPrefix, logger), middleware.ResponseHeadersMiddleware(cfg.ResponseHeaders), middleware.AuthMiddleware(authenticator, true), middleware.AnonymousRateLimitMiddleware(ratelimitStore), ) return backendMiddleware } func rumMiddleware(cfg *config.Config, authenticator *auth.Authenticator, ratelimitStore *ratelimit.Store, metricsPrefix string, mp metric.MeterProvider, logger *logp.Logger) []middleware.Middleware { msg := "RUM endpoint is disabled. " + "Configure the `apm-server.rum` section in apm-server.yml to enable ingestion of RUM events. " + "If you are not using the RUM agent, you can safely ignore this error." rumMiddleware := append(apmMiddleware(mp, metricsPrefix, logger), middleware.ResponseHeadersMiddleware(cfg.ResponseHeaders), middleware.ResponseHeadersMiddleware(cfg.RumConfig.ResponseHeaders), middleware.CORSMiddleware(cfg.RumConfig.AllowOrigins, cfg.RumConfig.AllowHeaders), middleware.AuthMiddleware(authenticator, true), middleware.AnonymousRateLimitMiddleware(ratelimitStore), ) return append(rumMiddleware, middleware.KillSwitchMiddleware(cfg.RumConfig.Enabled, msg)) } func rootMiddleware(cfg *config.Config, authenticator *auth.Authenticator, mp metric.MeterProvider, logger *logp.Logger) []middleware.Middleware { return append(apmMiddleware(mp, "apm-server.root.", logger), middleware.ResponseHeadersMiddleware(cfg.ResponseHeaders), middleware.AuthMiddleware(authenticator, false), ) } func baseRequestMetadata(c *request.Context) *modelpb.APMEvent { return &modelpb.APMEvent{ Timestamp: modelpb.FromTime(c.Timestamp), } } func backendRequestMetadataFunc(cfg *config.Config) func(c *request.Context) *modelpb.APMEvent { if !cfg.AugmentEnabled { return baseRequestMetadata } return func(c *request.Context) *modelpb.APMEvent { e := modelpb.APMEvent{ Timestamp: modelpb.FromTime(c.Timestamp), } if c.ClientIP.IsValid() { e.Host = &modelpb.Host{ Ip: []*modelpb.IP{modelpb.Addr2IP(c.ClientIP)}, } } return &e } } func rumRequestMetadataFunc(cfg *config.Config) func(c *request.Context) *modelpb.APMEvent { if !cfg.AugmentEnabled { return baseRequestMetadata } return func(c *request.Context) *modelpb.APMEvent { e := modelpb.APMEvent{ Timestamp: modelpb.FromTime(c.Timestamp), } if c.UserAgent != "" { e.UserAgent = &modelpb.UserAgent{Original: c.UserAgent} } if c.ClientIP.IsValid() { e.Client = &modelpb.Client{Ip: modelpb.Addr2IP(c.ClientIP)} } if c.SourcePort != 0 || c.SourceIP.IsValid() { e.Source = &modelpb.Source{Port: uint32(c.SourcePort)} if c.SourceIP.IsValid() { e.Source.Ip = modelpb.Addr2IP(c.SourceIP) } } if c.SourceNATIP.IsValid() { e.Source.Nat = &modelpb.NAT{Ip: modelpb.Addr2IP(c.SourceNATIP)} } return &e } } func notFoundHandler(c *request.Context) { c.Result.SetDefault(request.IDResponseErrorsNotFound) c.WriteResult() }