internal/beater/server.go (155 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package beater
import (
"context"
"net"
"net/http"
"go.elastic.co/apm/module/apmgorilla/v2"
"go.elastic.co/apm/v2"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"github.com/elastic/beats/v7/libbeat/version"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/apm-data/input"
"github.com/elastic/apm-data/model/modelpb"
"github.com/elastic/apm-data/model/modelprocessor"
"github.com/elastic/apm-server/internal/agentcfg"
"github.com/elastic/apm-server/internal/beater/api"
"github.com/elastic/apm-server/internal/beater/auth"
"github.com/elastic/apm-server/internal/beater/config"
"github.com/elastic/apm-server/internal/beater/otlp"
"github.com/elastic/apm-server/internal/beater/ratelimit"
"github.com/elastic/apm-server/internal/elasticsearch"
"github.com/elastic/apm-server/internal/kibana"
"github.com/elastic/apm-server/internal/sourcemap"
)
// WrapServerFunc is a function for injecting behaviour into ServerParams
// and RunServerFunc.
//
// WrapServerFunc may modify ServerParams, for example by wrapping the
// BatchProcessor with additional processors. Similarly, WrapServerFunc
// may wrap the RunServerFunc to run additional goroutines along with the
// server.
//
// WrapServerFunc may keep a reference to the provided ServerParams's
// BatchProcessor for asynchronous event publication, such as for
// aggregated metrics. All other events (i.e. those decoded from
// agent payloads) should be sent to the BatchProcessor in the
// ServerParams provided to RunServerFunc; this BatchProcessor will
// have rate-limiting, authorization, and data preprocessing applied.
type WrapServerFunc func(ServerParams, RunServerFunc) (ServerParams, RunServerFunc, error)
// RunServerFunc is a function which runs the APM Server until a
// fatal error occurs, or the context is cancelled.
type RunServerFunc func(context.Context, ServerParams) error
// ServerParams holds parameters for running the APM Server.
type ServerParams struct {
// Config is the configuration used for running the APM Server.
Config *config.Config
// Namespace holds the data stream namespace for the server.
Namespace string
// Logger is the logger for the beater component.
Logger *logp.Logger
// Tracer is an apm.Tracer that the APM Server may use
// for self-instrumentation.
Tracer *apm.Tracer
// TracerProvider is the TracerProvider
TracerProvider trace.TracerProvider
// MeterProvider is the MeterProvider
MeterProvider metric.MeterProvider
// Authenticator holds an authenticator that can be used for
// authenticating clients, and obtaining authentication details
// and an auth.Authorizer for authorizing the client for future
// actions on resources.
Authenticator *auth.Authenticator
// RateLimitStore holds an IP-based rate-limiter LRU cache.
RateLimitStore *ratelimit.Store
// SourcemapFetcher holds a sourcemap.Fetcher, or nil if source
// mapping is disabled.
SourcemapFetcher sourcemap.Fetcher
// AgentConfig holds an interface for fetching agent configuration.
AgentConfig agentcfg.Fetcher
// BatchProcessor is the model.BatchProcessor that is used
// for publishing events to the output, such as Elasticsearch.
BatchProcessor modelpb.BatchProcessor
// PublishReady holds a channel which will be signalled when the serve
// is ready to publish events. Readiness means that preconditions for
// event publication have been met, including icense checks for some
// features and waiting for the Fleet integration to be installed
// when running in standalone mode.
//
// Even if the server is not ready to publish events, it will still
// accept events and enqueue them for later publication.
PublishReady <-chan struct{}
// KibanaClient holds a Kibana client if the server has Kibana
// configuration. If the server has no Kibana configuration, this
// field will be nil.
KibanaClient *kibana.Client
// NewElasticsearchClient returns an elasticsearch.Client for cfg.
//
// This must be used whenever an elasticsearch client might be used
// for indexing. Under some configuration, the server will wrap the
// client's transport such that requests will be blocked until data
// streams have been initialised.
NewElasticsearchClient func(cfg *elasticsearch.Config) (*elasticsearch.Client, error)
// GRPCServer holds a *grpc.Server to which services will be registered
// for receiving data, configuration requests, etc.
//
// The gRPC server is configured with various interceptors, including
// authentication/authorization, logging, metrics, and tracing.
// See package internal/beater/interceptors for details.
GRPCServer *grpc.Server
// Semaphore holds a shared semaphore used to limit the number of
// concurrently running requests
Semaphore input.Semaphore
}
// newBaseRunServer returns the base RunServerFunc.
func newBaseRunServer(listener net.Listener) RunServerFunc {
return func(ctx context.Context, args ServerParams) error {
srv, err := newServer(args, listener)
if err != nil {
return err
}
return srv.run(ctx)
}
}
type server struct {
logger *logp.Logger
cfg *config.Config
httpServer *httpServer
grpcServer *grpc.Server
}
func newServer(args ServerParams, listener net.Listener) (server, error) {
publishReady := func() bool {
select {
case <-args.PublishReady:
return true
default:
return false
}
}
// Create an HTTP server for serving Elastic APM agent requests.
router, err := api.NewMux(
args.Config,
args.BatchProcessor,
args.Authenticator,
args.AgentConfig,
args.RateLimitStore,
args.SourcemapFetcher,
publishReady,
args.Semaphore,
args.MeterProvider,
args.Logger,
)
if err != nil {
return server{}, err
}
apmgorilla.Instrument(router, apmgorilla.WithRequestIgnorer(doNotTrace), apmgorilla.WithTracer(args.Tracer))
httpServer, err := newHTTPServer(args.Logger, args.Config, router, listener)
if err != nil {
return server{}, err
}
otlpBatchProcessor := args.BatchProcessor
if args.Config.AugmentEnabled {
// Add a model processor that sets `client.ip` for events from end-user devices.
otlpBatchProcessor = modelprocessor.Chained{
modelpb.ProcessBatchFunc(otlp.SetClientMetadata),
otlpBatchProcessor,
}
}
zapLogger := zap.New(args.Logger.Core(), zap.WithCaller(true))
otlp.RegisterGRPCServices(args.GRPCServer, zapLogger, otlpBatchProcessor, args.Semaphore, args.MeterProvider)
return server{
logger: args.Logger,
cfg: args.Config,
httpServer: httpServer,
grpcServer: args.GRPCServer,
}, nil
}
func (s server) run(ctx context.Context) error {
s.logger.Infof("Starting apm-server [%s built %s]. Hit CTRL-C to stop it.", version.Commit(), version.BuildTime())
defer s.logger.Infof("Server stopped")
g, ctx := errgroup.WithContext(ctx)
g.Go(s.httpServer.start)
g.Go(func() error {
return s.grpcServer.Serve(s.httpServer.grpcListener)
})
g.Go(func() error {
<-ctx.Done()
s.grpcServer.GracefulStop()
stopctx, cancel := context.WithTimeout(context.Background(), s.cfg.ShutdownTimeout)
defer cancel()
s.httpServer.stop(stopctx)
return nil
})
if err := g.Wait(); err != http.ErrServerClosed {
return err
}
return nil
}
func newAgentConfigFetcher(
ctx context.Context,
cfg *config.Config,
kibanaClient *kibana.Client,
newElasticsearchClient func(*elasticsearch.Config) (*elasticsearch.Client, error),
tracer *apm.Tracer,
mp metric.MeterProvider,
logger *logp.Logger,
) (agentcfg.Fetcher, func(context.Context) error, error) {
// Always use ElasticsearchFetcher, and as a fallback, use:
// 1. no fallback if Elasticsearch is explicitly configured
// 2. kibana fetcher
// 3. no fallback if (2) is not available
var fallbackFetcher agentcfg.Fetcher
switch {
case cfg.AgentConfig.ESOverrideConfigured:
// Disable fallback because agent config Elasticsearch is explicitly configured.
case kibanaClient != nil:
var err error
fallbackFetcher, err = agentcfg.NewKibanaFetcher(kibanaClient, cfg.AgentConfig.Cache.Expiration, logger)
if err != nil {
return nil, nil, err
}
default:
// It is possible that none of the above applies.
}
esClient, err := newElasticsearchClient(cfg.AgentConfig.ESConfig)
if err != nil {
return nil, nil, err
}
esFetcher := agentcfg.NewElasticsearchFetcher(esClient, cfg.AgentConfig.Cache.Expiration, fallbackFetcher, tracer, mp, logger)
return agentcfg.SanitizingFetcher{Fetcher: esFetcher}, esFetcher.Run, nil
}