systemtest/apmservertest/server.go (499 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package apmservertest import ( "bytes" "context" "crypto/ecdsa" "crypto/elliptic" "crypto/rand" "crypto/sha256" "crypto/tls" "crypto/x509" "crypto/x509/pkix" "encoding/json" "encoding/pem" "errors" "fmt" "io" "math/big" "net" "net/http" "net/url" "os" "os/exec" "strings" "sync" "testing" "time" "go.elastic.co/apm/v2" "go.elastic.co/apm/v2/transport" "go.uber.org/zap/zapcore" ) // Server is an APM server listening on a system-chosen port on the local // loopback interface, for use in end-to-end tests. type Server struct { // Config holds configuration for apm-server, which will be passed // to the apm-server command. // // Config will be initialised with DefaultConfig, and may be changed // any time until Start is called. Config Config // Dir is the working directory of the server. // // If Dir is empty when Start is called on an unstarted server, // then it will be set to a temporary directory, in which an // empty apm-server.yml, and the pipeline definition, will be created. // The temporary directory will be removed when the server is closed. Dir string // CertDir is the directory of the TLS certificates. // // If Dir is empty the current directory is used CertDir string // Log holds an optional io.Writer to which the process's Stderr will // be written, in addition to being available through the Server.Logs // field. // // Log is set by NewServerTB and NewUnstartedServerTB, and will be nil // for calls to NewUnstartedServer. Callers of NewUnstartedServer may // set Log prior to calling Start. Log io.Writer // BeatUUID will be populated with the server's Beat UUID after Start // returns successfully. This can be used to search for documents // corresponding to this test server instance. BeatUUID string // Version will be populated with the servers' version number after // Start returns successfully. Version string // Logs provides access to the apm-server log entries. Logs LogEntries // Stderr holds the stderr for apm-server, excluding logging. Stderr io.ReadCloser // URL holds the base URL for Elastic APM agents, in the form // http[s]://ipaddr:port with no trailing slash. URL string // TLS is optional TLS client configuration, populated with a new config // after TLS is started. TLS *tls.Config // EventMetadataFilter holds an optional EventMetadataFilter, which // can modify event metadata before it is sent to the server. // // New(Unstarted)Server sets a default filter which removes or // replaces environment-specific properties such as host name, // container ID, etc., to enable repeatable tests across different // test environments. EventMetadataFilter EventMetadataFilter args []string cmd *ServerCmd mu sync.Mutex tracers []*apm.Tracer closeCh chan struct{} } // NewServerTB returns a started Server, passings args to the apm-server command. // The server's Close method will be called when the test ends, and logs will be // written under apm-server/systemtest/logs/<test-name>/. func NewServerTB(tb testing.TB, args ...string) *Server { s := NewUnstartedServerTB(tb, args...) if err := s.Start(); err != nil { tb.Fatal(err) } return s } // NewUnstartedServerTB returns an unstarted Server, passing args to the apm-server // command. The server's Close method will be called when the test ends, and logs // will be written under apm-server/systemtest/logs/<test-name>/. func NewUnstartedServerTB(tb testing.TB, args ...string) *Server { s := NewUnstartedServer(args...) s.CertDir = tb.TempDir() logfile := createLogfile(tb, "apm-server") s.Log = logfile tb.Cleanup(func() { defer logfile.Close() if tb.Failed() { tb.Logf("log file: %s", logfile.Name()) } // Call the server's Close method in a background goroutine, // and wait for up to 10 seconds for it to complete. errc := make(chan error) go func() { errc <- s.Close() }() select { case err := <-errc: if err != nil { tb.Error(err) } close(errc) case <-time.After(10 * time.Second): // Channel receive on errc never happened. Start up a // goroutine to receive on errc and then clean up the // associated resources. go func() { <-errc; close(errc) }() } }) return s } // NewUnstartedServer returns an unstarted Server, passing args to the apm-server // command. The server's Close method must be called to clean up any resources // created by Start. func NewUnstartedServer(args ...string) *Server { return &Server{ Config: DefaultConfig(), EventMetadataFilter: DefaultMetadataFilter{}, args: args, closeCh: make(chan struct{}), } } // Start starts a server from NewUnstartedServer, waiting for it to start // listening for requests. // // Start will have set s.URL upon a successful return. func (s *Server) Start() error { return s.start(false) } func (s *Server) StartTLS() error { return s.start(true) } func (s *Server) start(tls bool) error { if s.URL != "" { panic("Server already started") } s.Logs.init() extra := map[string]interface{}{ // These are config attributes that we always specify, // as the testing framework relies on them being set. "logging.level": "debug", "logging.to_stderr": true, "apm-server.expvar.enabled": true, "apm-server.host": "127.0.0.1:0", } if tls { certPath, keyPath, caCertPath, err := s.initTLS() if err != nil { panic(err) } extra["apm-server.ssl.certificate"] = certPath extra["apm-server.ssl.key"] = keyPath if s.Config.TLS != nil && s.Config.TLS.ClientAuthentication != "" { extra["apm-server.ssl.certificate_authorities"] = []string{caCertPath} } } cfgargs, err := configArgs(s.Config, extra) if err != nil { return err } args := append(cfgargs, s.args...) args = append(args, "--path.home", ".") // working directory, s.Dir s.cmd = ServerCommand(context.Background(), "run", args...) s.cmd.Dir = s.Dir // This speeds up tests by forcing the self-instrumentation // event streams to be closed after 100ms. This is only necessary // because processor/stream waits for the stream to be closed // before the last batch is processed. // // TODO(axw) remove this once the server processes batches without // waiting for the stream to be closed. s.cmd.Env = append(os.Environ(), "ELASTIC_APM_API_REQUEST_TIME=100ms") stderr, err := s.cmd.StderrPipe() if err != nil { return err } if err := s.cmd.Start(); err != nil { stderr.Close() return err } s.Dir = s.cmd.Dir // Consume the process's stderr. var stderrReader io.Reader = stderr if s.Log != nil { // Write the apm-server command line to the top of the log. s.printCmdline(s.Log, args) stderrReader = io.TeeReader(stderrReader, s.Log) } stderrPipeReader, stderrPipeWriter := io.Pipe() s.Stderr = stderrPipeReader go s.consumeStderr(stderrReader, stderrPipeWriter) logs := s.Logs.Iterator() defer logs.Close() if err := s.waitUntilListening(tls, logs); err != nil { return err } return nil } func (s *Server) initTLS() (serverCertPath, serverKeyPath, caCertPath string, _ error) { serverCertPath, serverKeyPath, err := generateCerts(s.CertDir, true, x509.ExtKeyUsageServerAuth, "127.0.0.1", "::1") // Load a self-signed server certificate for testing TLS encryption. serverCertBytes, err := os.ReadFile(serverCertPath) if err != nil { return "", "", "", err } certpool := x509.NewCertPool() if !certpool.AppendCertsFromPEM(serverCertBytes) { panic("failed to add CA certificate to cert pool") } clientCertPath, clientKeyPath, err := generateCerts(s.CertDir, false, x509.ExtKeyUsageClientAuth, "") if err != nil { return "", "", "", err } // Load a self-signed client certificate for testing TLS client certificate auth. clientCertBytes, err := os.ReadFile(clientCertPath) if err != nil { return "", "", "", err } clientKeyBytes, err := os.ReadFile(clientKeyPath) if err != nil { return "", "", "", err } clientCert, err := tls.X509KeyPair(clientCertBytes, clientKeyBytes) if err != nil { return "", "", "", err } s.TLS = &tls.Config{ Certificates: []tls.Certificate{clientCert}, RootCAs: certpool, } return serverCertPath, serverKeyPath, clientCertPath, nil } func generateCerts(dir string, ca bool, keyUsage x509.ExtKeyUsage, hosts ...string) (string, string, error) { serialNumberLimit := new(big.Int).Lsh(big.NewInt(1), 128) serialNumber, err := rand.Int(rand.Reader, serialNumberLimit) if err != nil { return "", "", fmt.Errorf("Failed to generate serial number: %w", err) } notBefore := time.Now() notAfter := notBefore.Add(24 * time.Hour) template := x509.Certificate{ SerialNumber: serialNumber, Subject: pkix.Name{ Organization: []string{"Org"}, }, NotBefore: notBefore, NotAfter: notAfter, KeyUsage: x509.KeyUsageDigitalSignature, ExtKeyUsage: []x509.ExtKeyUsage{keyUsage}, BasicConstraintsValid: true, } for _, h := range hosts { if ip := net.ParseIP(h); ip != nil { template.IPAddresses = append(template.IPAddresses, ip) } } if ca { template.IsCA = true template.KeyUsage |= x509.KeyUsageCertSign } clientKey, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) if err != nil { return "", "", fmt.Errorf("failed to generate client key: %w", err) } privBytes, err := x509.MarshalPKCS8PrivateKey(clientKey) if err != nil { return "", "", fmt.Errorf("unable to marshal private key: %w", err) } h := sha256.Sum256(privBytes) template.SubjectKeyId = h[:] derBytes, err := x509.CreateCertificate(rand.Reader, &template, &template, clientKey.Public(), clientKey) if err != nil { return "", "", fmt.Errorf("failed to create certificate: %w", err) } certOut, err := os.CreateTemp(dir, "client_cert.pem") if err != nil { return "", "", fmt.Errorf("failed to open client_cert.pem for writing: %w", err) } if err := pem.Encode(certOut, &pem.Block{Type: "CERTIFICATE", Bytes: derBytes}); err != nil { return "", "", fmt.Errorf("failed to write data to client_cert.pem: %w", err) } if err := certOut.Close(); err != nil { return "", "", fmt.Errorf("error closing client_cert.pem: %w", err) } keyOut, err := os.CreateTemp(dir, "client_key.pem") if err != nil { return "", "", fmt.Errorf("failed to open client_key.pem for writing: %w", err) } if err := pem.Encode(keyOut, &pem.Block{Type: "PRIVATE KEY", Bytes: privBytes}); err != nil { return "", "", fmt.Errorf("failed to write data to client_key.pem: %w", err) } if err := keyOut.Close(); err != nil { return "", "", fmt.Errorf("error closing client_key.pem: %w", err) } return certOut.Name(), keyOut.Name(), nil } func (s *Server) printCmdline(w io.Writer, args []string) { var buf bytes.Buffer fmt.Fprint(&buf, "# Running apm-server\n") for i := 0; i < len(args); i += 2 { fmt.Fprintf(&buf, "# \t") if args[i] == "-E" && i+1 < len(args) { fmt.Fprintf(&buf, "%s %s\n", args[i], args[i+1]) } else { fmt.Fprintf(&buf, "%s\n", strings.Join(args[i:], " ")) break } } if _, err := buf.WriteTo(w); err != nil { panic(err) } } func (s *Server) waitUntilListening(tls bool, logs *LogEntryIterator) error { // First wait for the Beat UUID and server version to be logged. for entry := range logs.C() { if entry.Level != zapcore.InfoLevel || (entry.Message != "Beat info" && entry.Message != "Build info") { continue } systemInfo, ok := entry.Fields["system_info"].(map[string]interface{}) if !ok { continue } for k, info := range systemInfo { switch k { case "beat": beatInfo := info.(map[string]interface{}) s.BeatUUID = beatInfo["uuid"].(string) case "build": buildInfo := info.(map[string]interface{}) s.Version = buildInfo["version"].(string) } } if s.BeatUUID != "" && s.Version != "" { break } } var elasticHTTPListeningAddr string for entry := range logs.C() { if entry.Level != zapcore.InfoLevel { continue } sep := strings.LastIndex(entry.Message, ": ") if sep == -1 { continue } prefix, addr := entry.Message[:sep], strings.TrimSpace(entry.Message[sep+1:]) if prefix != "Listening on" { continue } if _, _, err := net.SplitHostPort(addr); err != nil { return fmt.Errorf("invalid listening address %q: %w", addr, err) } elasticHTTPListeningAddr = addr break } if elasticHTTPListeningAddr != "" { urlScheme := "http" if tls { urlScheme = "https" } s.URL = (&url.URL{Scheme: urlScheme, Host: elasticHTTPListeningAddr}).String() return nil } // Didn't find message, server probably exited... if err := s.Close(); err != nil { if err, ok := err.(*exec.ExitError); ok && err != nil { stderr, _ := io.ReadAll(s.Stderr) err.Stderr = stderr } return err } return errors.New("server exited cleanly without logging expected startup message") } // consumeStderr consumes the apm-server process's stderr, recording // log entries. After any errors occur decoding log entries, remaining // stderr is available through s.Stderr. func (s *Server) consumeStderr(procStderr io.Reader, out *io.PipeWriter) { type logEntry struct { Timestamp logpTimestamp `json:"@timestamp"` Message string `json:"message"` Level zapcore.Level `json:"log.level"` Logger string `json:"log.logger"` Origin struct { File string `json:"file.name"` Line int `json:"file.line"` } `json:"log.origin"` } decoder := json.NewDecoder(procStderr) for { var raw json.RawMessage if err := decoder.Decode(&raw); err != nil { break } var entry logEntry if err := json.Unmarshal(raw, &entry); err != nil { break } var fields map[string]interface{} if err := json.Unmarshal(raw, &fields); err != nil { break } delete(fields, "@timestamp") delete(fields, "log.level") delete(fields, "log.logger") delete(fields, "log.origin") delete(fields, "message") s.Logs.add(LogEntry{ Timestamp: time.Time(entry.Timestamp), Logger: entry.Logger, Level: entry.Level, File: entry.Origin.File, Line: entry.Origin.Line, Message: entry.Message, Fields: fields, }) } s.Logs.close() // Send the remaining stderr to s.Stderr. procStderr = io.MultiReader(decoder.Buffered(), procStderr) _, err := io.Copy(out, procStderr) out.CloseWithError(err) } // Close shuts down the server gracefully if possible, and forcefully otherwise. // // Close must be called in order to clean up any resources created for running // the server. Calling Close on an unstarted server is a no-op. func (s *Server) Close() error { select { case <-s.closeCh: return nil default: close(s.closeCh) } if s.cmd == nil { return nil } s.closeTracers() if s.cmd.Process == nil { return errors.New("apm server process not started") } if err := interruptProcess(s.cmd.Process); err != nil { s.cmd.Process.Kill() } if err := s.Wait(); err != nil { return err } // close stderr so that the consumeStderr goroutine // exits s.Stderr.Close() return nil } func (s *Server) closeTracers() { s.mu.Lock() defer s.mu.Unlock() for _, tracer := range s.tracers { tracer.Close() } s.tracers = nil } // Kill forcefully shuts down the server. func (s *Server) Kill() error { if s.cmd != nil { s.cmd.Process.Kill() } return s.Wait() } // Wait waits for the server to exit. // // Wait waits up to 10 seconds for the process's stderr to be closed, // and then waits for the process to exit. func (s *Server) Wait() error { if s.cmd == nil { return errors.New("apm-server not started") } logs := s.Logs.Iterator() defer logs.Close() deadline := time.After(10 * time.Second) for { select { case _, ok := <-logs.C(): if !ok { return s.cmd.Wait() } case <-deadline: return s.cmd.Wait() } } } // Tracer returns a new apm.Tracer, configured with the server's URL and secret // token if any. This must only be called after the server has been started. // // The Tracer will be closed when the server is closed. func (s *Server) Tracer() *apm.Tracer { serverURL, err := url.Parse(s.URL) if err != nil { panic(err) } httpTransport, err := transport.NewHTTPTransport(transport.HTTPTransportOptions{ ServerURLs: []*url.URL{serverURL}, SecretToken: s.Config.AgentAuth.SecretToken, TLSClientConfig: s.TLS, }) if err != nil { panic(err) } opts := apm.TracerOptions{Transport: httpTransport} if s.EventMetadataFilter != nil { opts.Transport = NewFilteringTransport(httpTransport, s.EventMetadataFilter) } tracer, err := apm.NewTracerOptions(opts) if err != nil { panic(err) } s.mu.Lock() defer s.mu.Unlock() s.tracers = append(s.tracers, tracer) return tracer } // GetExpvar queries the server's /debug/vars endpoint, parsing the response // into an Expvar structure. func (s *Server) GetExpvar() *Expvar { resp, err := http.Get(s.URL + "/debug/vars") if err != nil { panic(err) } defer resp.Body.Close() expvar, err := decodeExpvar(resp.Body) if err != nil { panic(err) } return expvar } // WaitForPublishReady polls the server's "GET /" endpoint, waiting for it to // indicate it is in the publish-ready state, or for the context to be canceled. func (s *Server) WaitForPublishReady(ctx context.Context) error { ticker := time.NewTicker(50 * time.Millisecond) defer ticker.Stop() for { select { case <-ctx.Done(): return ctx.Err() case <-ticker.C: ready, err := s.isPublishReady() if err != nil { // Errors are not expected, as the server // should be operational by the time Start // returns. return err } if ready { return nil } } } } func (s *Server) isPublishReady() (bool, error) { req, err := http.NewRequest(http.MethodGet, s.URL+"/", nil) if err != nil { return false, err } if secretToken := s.Config.AgentAuth.SecretToken; secretToken != "" { req.Header.Set("Authorization", "Bearer "+secretToken) } resp, err := http.DefaultClient.Do(req) if err != nil { return false, err } defer resp.Body.Close() var apmResp struct { BuildDate string `json:"build_date"` BuildSHA string `json:"build_sha"` PublishReady bool `json:"publish_ready"` Version string `json:"version"` } if err := json.NewDecoder(resp.Body).Decode(&apmResp); err != nil { return false, err } return apmResp.PublishReady, nil }