receiver/elasticapmreceiver/agentcfg.go (106 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package elasticapmreceiver // import "github.com/elastic/opentelemetry-collector-components/receiver/elasticapmreceiver" import ( "context" "encoding/json" "errors" "fmt" "net/http" "strings" "github.com/elastic/opentelemetry-lib/agentcfg" "go.opentelemetry.io/collector/component" ) // http header keys const ( AccessControlExposeHeaders = "Access-Control-Expose-Headers" APIKey = "ApiKey" CacheControl = "Cache-Control" ContentType = "Content-Type" Etag = "Etag" IfNoneMatch = "If-None-Match" ) const ( msgMethodUnsupported = "method not supported" ) func (r *elasticAPMReceiver) newElasticAPMConfigsHandler(ctx context.Context, host component.Host) http.HandlerFunc { mapBodyError := func(err string) map[string]string { return map[string]string{"error": err} } // write a json response and log any encoding error encodeJsonLogError := func(w http.ResponseWriter, data any) { w.Header().Set(ContentType, "application/json") err := json.NewEncoder(w).Encode(data) if err != nil { r.settings.Logger.Error(fmt.Sprintf("error encoding json response: %s", err.Error())) } } // servers where the Kibana/ES connection is not enabled (server responds with 403) // https://github.com/elastic/apm/blob/main/specs/agents/configuration.md#dealing-with-errors if r.fetcherFactory == nil { r.settings.Logger.Warn("no agent configuration fetcher available") return func(w http.ResponseWriter, req *http.Request) { w.WriteHeader(http.StatusForbidden) encodeJsonLogError(w, mapBodyError("remote configuration fetcher not enabled")) } } fetcher, err := r.fetcherFactory(ctx, host) if err != nil { r.settings.Logger.Error(fmt.Sprintf("could not start agent configuration fetcher: %s", err.Error())) // servers where the Kibana/ES connection is enabled, but unavailable (server responds with 503) return func(w http.ResponseWriter, req *http.Request) { w.WriteHeader(http.StatusServiceUnavailable) encodeJsonLogError(w, mapBodyError(err.Error())) } } return func(w http.ResponseWriter, req *http.Request) { query, queryErr := buildQuery(req) if queryErr != nil { w.WriteHeader(http.StatusBadRequest) encodeJsonLogError(w, mapBodyError(queryErr.Error())) return } result, err := fetcher.Fetch(req.Context(), query) if err != nil { w.WriteHeader(http.StatusBadRequest) encodeJsonLogError(w, mapBodyError(err.Error())) return } // configuration successfully fetched w.Header().Set(CacheControl, fmt.Sprintf("max-age=%v, must-revalidate", r.cfg.AgentConfig.CacheDuration.Seconds())) w.Header().Set(Etag, fmt.Sprintf("%q", result.Source.Etag)) w.Header().Set(AccessControlExposeHeaders, Etag) if result.Source.Etag == query.Etag { // c.Result.SetDefault(request.IDResponseValidNotModified) w.WriteHeader(http.StatusNotModified) } else { // c.Result.SetWithBody(request.IDResponseValidOK, result.Source.Settings) w.WriteHeader(http.StatusOK) encodeJsonLogError(w, result.Source.Settings) } } } func buildQuery(r *http.Request) (agentcfg.Query, error) { var query agentcfg.Query switch r.Method { case http.MethodPost: if err := json.NewDecoder(r.Body).Decode(&query); err != nil { return query, err } case http.MethodGet: params := r.URL.Query() query = agentcfg.Query{ Service: agentcfg.Service{ Name: params.Get(agentcfg.ServiceName), Environment: params.Get(agentcfg.ServiceEnv), }, } default: if err := fmt.Errorf("%s: %s", msgMethodUnsupported, r.Method); err != nil { return query, err } } if query.Service.Name == "" { return query, errors.New(agentcfg.ServiceName + " is required") } if query.Etag == "" { query.Etag = ifNoneMatch(r) } return query, nil } func ifNoneMatch(r *http.Request) string { if h := r.Header.Get("If-None-Match"); h != "" { return strings.Replace(h, "\"", "", -1) } return r.URL.Query().Get(agentcfg.Etag) }