cmd/intake-receiver/main.go (266 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. //go:generate bash ../../script/intake-receiver-version.sh package main import ( "bufio" "bytes" "compress/gzip" "compress/zlib" "context" "encoding/json" "errors" "flag" "fmt" "io" "log" "net" "net/http" "os" "os/signal" "path/filepath" "sync" "time" "go.elastic.co/apm/v2/model" ) var maxScannerBufSize = 300 * 1024 // APM Server default func main() { // Ignored flags, they are just here to allow the `intake-receiver` to be // dropped in as a replacement for APM Server. This means, that all the // config options are ignored. flag.String("e", "", "apm-server compatibility option") flag.String("E", "", "apm-server compatibility option") flag.String("httpprof", "", "apm-server compatibility option") var host, folder string flag.StringVar(&host, "host", ":8200", "port that the server will listen to") flag.StringVar(&folder, "folder", "events", "The path where the received intake events will be stored") flag.Parse() // Create a context that will be cancelled when an interrupt is received. ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill) defer stop() if err := os.MkdirAll(folder, 0755); err != nil { log.Fatalln(err) } var agentFileMap fileMap defer agentFileMap.m.Range(func(_, v interface{}) bool { if f, ok := v.(*syncFile); ok && f != nil { f.Close() log.Println("closed file", f.Name()) } return true }) rh := requestHandler{ agentFileMap: &agentFileMap, basePath: folder, rootResponse: fmt.Sprintf(`{"publish_ready":true,"version":"%s"}`+"\n", version), bufPool: sync.Pool{New: func() interface{} { return &bytes.Buffer{} }}, bytesBufPool: sync.Pool{New: func() interface{} { return make([]byte, maxScannerBufSize) }}, } mux := http.NewServeMux() mux.Handle("/", rh.rootHandler()) // TODO(marclop): intake/v2/rum and /intake/v3/rum/events are not supported. for _, p := range []string{"/intake/v2/events"} { mux.Handle(p, rh.eventHandler()) } srv := http.Server{ Addr: host, Handler: mux, ReadTimeout: 30 * time.Second, BaseContext: func(l net.Listener) context.Context { return ctx }, } go func() { <-ctx.Done() log.Println("closing http server...") shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() if err := srv.Shutdown(shutdownCtx); err != nil { log.Println(err) } }() log.Println("http server listening for requests on", srv.Addr) if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { log.Println(err) } } type requestHandler struct { agentFileMap *fileMap bufPool sync.Pool bytesBufPool sync.Pool basePath string rootResponse string } func (h *requestHandler) rootHandler() http.Handler { return logHandler(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { switch r.URL.Path { case "/": rw.WriteHeader(200) rw.Write([]byte(h.rootResponse)) case "/config/v1/agents": // Prevent the APM Agents from logging errors. rw.WriteHeader(200) rw.Write([]byte(`{}`)) default: http.Error( rw, fmt.Sprintf(" %s is not implemented", r.URL.Path), http.StatusNotImplemented, ) } })) } func (h *requestHandler) eventHandler() http.Handler { return logHandler( http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { code, err := h.handleRequest(r) if err != nil { log.Println("failed handling request", code, err.Error()) http.Error(rw, err.Error(), code) return } rw.WriteHeader(code) }), ) } func (h *requestHandler) handleRequest(r *http.Request) (int, error) { buf := h.bufPool.Get().(*bytes.Buffer) defer func() { buf.Reset() h.bufPool.Put(buf) }() var err error body := r.Body encoding := r.Header.Get("Content-Encoding") switch encoding { case "deflate": body, err = zlib.NewReader(r.Body) defer body.Close() case "gzip": body, err = gzip.NewReader(r.Body) defer body.Close() case "": default: return http.StatusBadRequest, fmt.Errorf( "Content-Encoding %s not supported", encoding, ) } if err != nil { return http.StatusInternalServerError, fmt.Errorf( "unable to create compressed reader for %s: %v", encoding, err, ) } var meta metadata if err := h.processBatch(body, buf, &meta); err != nil { return http.StatusBadRequest, fmt.Errorf("invalid request: %v", err) } if meta.IsEmpty() { return http.StatusBadRequest, errors.New("agent not found in metadata") } fileName := filepath.Join(h.basePath, agentFileName(meta)) f, err := h.agentFileMap.Get(fileName) if err != nil { return http.StatusInternalServerError, fmt.Errorf( "couldn't retrieve storage file: %s", fileName, ) } h.agentFileMap.Set(fileName, f) if _, err := io.Copy(f, buf); err != nil { return http.StatusInternalServerError, fmt.Errorf( "failed writing to file: %s: %v", fileName, err, ) } return http.StatusAccepted, nil } func (h *requestHandler) processBatch(body io.ReadCloser, buf io.Writer, meta *metadata) error { byteBuf := h.bytesBufPool.Get().([]byte) defer func() { byteBuf = byteBuf[:0] h.bufPool.Put(byteBuf) }() defer body.Close() scanner := bufio.NewScanner(body) scanner.Buffer(byteBuf, 0) var decodedMeta bool var err error for scanner.Scan() { line := scanner.Bytes() // Discard any lines that are at the maximum allowed length and aren't // completely scanned. if len(line) == maxScannerBufSize && line[len(line)-1] != '}' { continue } buf.Write(line) buf.Write([]byte("\n")) if decodedMeta { // continue scanning continue } if e := json.Unmarshal(scanner.Bytes(), &meta); e != nil { // TODO(marclop) multierror? err = e // Continue scanning, like we do in the APM Server itself. continue } // TODO(marclop) support RUM. decodedMeta = meta.V2.Service.Agent != nil } if err := scanner.Err(); err != nil { return err } return err } func agentFileName(meta metadata) string { const format = "%s-%s.ndjson" if meta.V2.Service.Agent != nil { agent := meta.V2.Service.Agent return fmt.Sprintf(format, agent.Name, agent.Version) } agent := meta.V3RUM.Service.Agent return fmt.Sprintf(format, agent.Name, agent.Version) } func logHandler(h http.HandlerFunc) http.Handler { return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { t := time.Now() h(rw, r) log.Printf("processed request to %s (%s) in %s\n", r.URL.Path, r.Header.Get("User-Agent"), time.Since(t).String(), ) }) } func (m *fileMap) Get(p string) (*syncFile, error) { if v, ok := m.m.Load(p); ok { return v.(*syncFile), nil } // NOTE(marclop) Optionally, the files could also be truncated. f, err := os.OpenFile(p, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0660) if err != nil { return nil, err } return &syncFile{File: f}, nil } func (m *fileMap) Set(p string, f *syncFile) { m.m.Store(p, f) } type syncFile struct { // We want to avoid multiple requests writing to the same file at the same // time, since may lead to incorrect events. mu sync.Mutex *os.File } func (f *syncFile) Write(b []byte) (n int, err error) { f.mu.Lock() defer f.mu.Unlock() return f.File.Write(b) } func (f *syncFile) ReadFrom(r io.Reader) (n int64, err error) { f.mu.Lock() defer f.mu.Unlock() return f.File.ReadFrom(r) } type fileMap struct { m sync.Map } // Models // Wraps both intake v2 and v2/rum metadata formats. type metadata struct { V2 v2Metadata `json:"metadata,omitempty"` V3RUM v3RUMMetadata `json:"m,omitempty"` // NOTE(marclop), we could do some decoding of the received data // to accumulate statistics. } func (m metadata) IsEmpty() bool { return (m.V2.Service.Agent == nil || m.V2.Service.Agent.Name == "") && m.V3RUM.Service.Agent.Name == "" } type v2Metadata struct { Service model.Service `json:"service"` } type v3RUMMetadata struct { Service v3ServiceMeta `json:"se"` } type v3ServiceMeta struct { Agent metadataServiceAgent `json:"a"` Name string `json:"n"` Version string `json:"ve"` } type metadataServiceAgent struct { Name string `json:"n"` Version string `json:"ve"` }