internal/beater/beatertest/server.go (154 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package beatertest import ( "context" "errors" "net" "net/http" "net/url" "strings" "testing" "time" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/noop" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest/observer" "golang.org/x/sync/errgroup" "github.com/elastic/apm-server/internal/beater" agentconfig "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp/logptest" ) // Server runs the core APM Server that, by default, listens on a system-chosen port // on the loopback interface, for use in testing package beater. type Server struct { group *errgroup.Group ctx context.Context cancel context.CancelFunc runner *beater.Runner // Logs holds the server's logs. Logs *observer.ObservedLogs // Client holds an *http.Client that is configured for sending // requests to the server. Client *http.Client // URL holds the base URL of the server. // // This will be set by Start and newServer. URL string } // NewServer returns a new started APM Server with the given configuration. // // The server's Close method will be invoked as a test cleanup. func NewServer(t testing.TB, opts ...option) *Server { srv := NewUnstartedServer(t, opts...) err := srv.Start() require.NoError(t, err) return srv } // NewUnstartedServer returns a new unstarted APM Server with the given options. // // By default the server will send documents to no-op output. To observe documents // as they would be sent to Elasticsearch, use ElasticsearchOutputConfig and pass the // configuration to New(Unstarted)Server. // // The server's Start method should be called to start the server. func NewUnstartedServer(t testing.TB, opts ...option) *Server { core, observedLogs := observer.New(zapcore.DebugLevel) logger := logptest.NewTestingLogger(t, "", zap.WrapCore(func(in zapcore.Core) zapcore.Core { return zapcore.NewTee(in, core) })) options := options{ config: []*agentconfig.C{agentconfig.MustNewConfigFrom(map[string]interface{}{ "apm-server": map[string]interface{}{ "host": "localhost:0", }, })}, meterProvider: noop.NewMeterProvider(), } for _, o := range opts { o(&options) } cfg, err := agentconfig.MergeConfigs(options.config...) require.NoError(t, err) // If no output is defined, use output.null. var outputConfig struct { Output agentconfig.Namespace `config:"output"` } err = cfg.Unpack(&outputConfig) require.NoError(t, err) if !outputConfig.Output.IsSet() { esCfg, _ := ElasticsearchOutputConfig(t) err = cfg.Merge(esCfg) require.NoError(t, err) } runner, err := beater.NewRunner(beater.RunnerParams{ Config: cfg, Logger: logger, WrapServer: options.wrapServer, TracerProvider: options.tracerProvider, MeterProvider: options.meterProvider, }) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) group, ctx := errgroup.WithContext(ctx) srv := &Server{ group: group, ctx: ctx, cancel: cancel, runner: runner, Logs: observedLogs, } t.Cleanup(func() { srv.Close() }) return srv } // Start starts the server running in the background. // // After Start returns without error, s.URL will be set to the server's base URL. func (s *Server) Start() error { exited := make(chan struct{}) s.group.Go(func() error { defer close(exited) return s.runner.Run(s.ctx) }) listenAddr, err := s.waitListenAddr(exited) if err != nil { return err } var transport http.RoundTripper if u, err := url.Parse(listenAddr); err == nil && u.Scheme == "unix" { transport = &http.Transport{ DialContext: func(context.Context, string, string) (net.Conn, error) { return net.Dial("unix", u.Path) }, } s.URL = "http://test-apm-server/" } else { baseURL := &url.URL{Scheme: "http", Host: listenAddr} s.URL = baseURL.String() } s.Client = &http.Client{Transport: transport} return nil } func (s *Server) waitListenAddr(exited <-chan struct{}) (string, error) { deadline := time.After(10 * time.Second) for { for _, entry := range s.Logs.All() { const prefix = "Listening on: " if strings.HasPrefix(entry.Message, prefix) { return entry.Message[len(prefix):], nil } } select { case <-exited: return "", errors.New("server exited before logging expected message") case <-deadline: return "", errors.New("timeout waiting for server to start listening") case <-time.After(10 * time.Millisecond): } } } // Close stops the server. func (s *Server) Close() error { s.cancel() return s.group.Wait() } type options struct { config []*agentconfig.C wrapServer beater.WrapServerFunc tracerProvider trace.TracerProvider meterProvider metric.MeterProvider } type option func(*options) // WithConfig is an option for supplying configuration to the server. // Configuration provided by WithConfig will be merged with the base // configuration: // // apm-server: // host: localhost:0 func WithConfig(cfg ...*agentconfig.C) option { return func(opts *options) { opts.config = append(opts.config, cfg...) } } // WithWrapServer is an option for setting a WrapServerFunc. func WithWrapServer(wrapServer beater.WrapServerFunc) option { return func(opts *options) { opts.wrapServer = wrapServer } } // WithMeterProvider is an option for setting a MeterProvider func WithMeterProvider(mp metric.MeterProvider) option { return func(opts *options) { opts.meterProvider = mp } }