internal/beater/api/config/agent/handler.go (190 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package agent
import (
"encoding/json"
"fmt"
"net/http"
"strings"
"time"
"github.com/pkg/errors"
"github.com/elastic/apm-server/internal/agentcfg"
"github.com/elastic/apm-server/internal/beater/auth"
"github.com/elastic/apm-server/internal/beater/headers"
"github.com/elastic/apm-server/internal/beater/request"
)
const (
errMaxAgeDuration = 5 * time.Minute
msgInvalidQuery = "invalid query"
msgMethodUnsupported = "method not supported"
msgServiceUnavailable = "service unavailable"
)
var (
errCacheControl = fmt.Sprintf("max-age=%v, must-revalidate", errMaxAgeDuration.Seconds())
)
type handler struct {
f agentcfg.Fetcher
allowAnonymousAgents []string
cacheControl, defaultServiceEnvironment string
}
func NewHandler(
f agentcfg.Fetcher,
cacheMaxAge time.Duration,
defaultServiceEnvironment string,
allowAnonymousAgents []string,
) request.Handler {
if f == nil {
panic("fetcher must not be nil")
}
cacheControl := fmt.Sprintf("max-age=%v, must-revalidate", cacheMaxAge.Seconds())
h := &handler{
f: f,
cacheControl: cacheControl,
defaultServiceEnvironment: defaultServiceEnvironment,
allowAnonymousAgents: allowAnonymousAgents,
}
return h.Handle
}
// Handler implements request.Handler for managing agent central configuration
// requests.
func (h *handler) Handle(c *request.Context) {
// error handling
c.ResponseWriter.Header().Set(headers.CacheControl, errCacheControl)
query, queryErr := buildQuery(c)
if queryErr != nil {
extractQueryError(c, queryErr)
c.WriteResult()
return
}
if query.Service.Environment == "" {
query.Service.Environment = h.defaultServiceEnvironment
}
// Only service, and not agent, is known for config queries.
// For anonymous/untrusted agents, we filter the results using
// query.InsecureAgents below.
authResource := auth.Resource{ServiceName: query.Service.Name}
if err := auth.Authorize(c.Request.Context(), auth.ActionAgentConfig, authResource); err != nil {
if errors.Is(err, auth.ErrUnauthorized) {
id := request.IDResponseErrorsForbidden
status := request.MapResultIDToStatus[id]
c.Result.Set(id, status.Code, err.Error(), nil, nil)
} else {
c.Result.SetDefault(request.IDResponseErrorsServiceUnavailable)
c.Result.Err = err
}
c.WriteResult()
return
}
if c.Authentication.Method == auth.MethodAnonymous {
// Unauthenticated client, restrict results.
query.InsecureAgents = h.allowAnonymousAgents
}
result, err := h.f.Fetch(c.Request.Context(), query)
if err != nil {
extractInternalError(c, err)
c.WriteResult()
return
}
// configuration successfully fetched
c.ResponseWriter.Header().Set(headers.CacheControl, h.cacheControl)
c.ResponseWriter.Header().Set(headers.Etag, fmt.Sprintf("\"%s\"", result.Source.Etag))
c.ResponseWriter.Header().Set(headers.AccessControlExposeHeaders, headers.Etag)
if result.Source.Etag == ifNoneMatch(c) {
c.Result.SetDefault(request.IDResponseValidNotModified)
} else {
c.Result.SetWithBody(request.IDResponseValidOK, result.Source.Settings)
}
c.WriteResult()
}
func buildQuery(c *request.Context) (agentcfg.Query, error) {
r := c.Request
var query agentcfg.Query
switch r.Method {
case http.MethodPost:
if err := json.NewDecoder(r.Body).Decode(&query); err != nil {
return query, err
}
case http.MethodGet:
params := r.URL.Query()
query = agentcfg.Query{
Service: agentcfg.Service{
Name: params.Get(agentcfg.ServiceName),
Environment: params.Get(agentcfg.ServiceEnv),
},
}
default:
if err := errors.Errorf("%s: %s", msgMethodUnsupported, r.Method); err != nil {
return query, err
}
}
if query.Service.Name == "" {
return query, errors.New(agentcfg.ServiceName + " is required")
}
query.Etag = ifNoneMatch(c)
return query, nil
}
func extractInternalError(c *request.Context, err error) {
msg := err.Error()
var body interface{}
var keyword string
switch {
case strings.Contains(msg, agentcfg.ErrMsgSendToKibanaFailed):
body = authErrMsg(c, msg, agentcfg.ErrMsgSendToKibanaFailed)
keyword = agentcfg.ErrMsgSendToKibanaFailed
case strings.Contains(msg, agentcfg.ErrMsgReadKibanaResponse):
body = authErrMsg(c, msg, agentcfg.ErrMsgReadKibanaResponse)
keyword = agentcfg.ErrMsgReadKibanaResponse
case strings.Contains(msg, agentcfg.ErrUnauthorized):
fullMsg := "APM Server is not authorized to query Kibana. " +
"Please configure apm-server.kibana.username and apm-server.kibana.password, " +
"and ensure the user has the necessary privileges."
body = authErrMsg(c, fullMsg, agentcfg.ErrUnauthorized)
keyword = agentcfg.ErrUnauthorized
case strings.Contains(msg, agentcfg.ErrNoValidElasticsearchConfig):
body = "Your Elasticsearch configuration does not support agent config queries. Check your configurations at `output.elasticsearch` or `apm-server.agent.config.elasticsearch`."
c.Result.Set(
request.IDResponseErrorsForbidden,
http.StatusForbidden,
keyword,
body,
err,
)
return
case strings.Contains(msg, agentcfg.ErrInfrastructureNotReady):
body = "Agent configuration infrastructure is not ready. Please retry later."
c.Result.Set(
request.IDResponseErrorsServiceUnavailable,
http.StatusServiceUnavailable,
keyword,
body,
err,
)
return
default:
body = authErrMsg(c, msg, msgServiceUnavailable)
keyword = msgServiceUnavailable
}
c.Result.Set(request.IDResponseErrorsServiceUnavailable,
http.StatusServiceUnavailable,
keyword,
body,
err)
}
func extractQueryError(c *request.Context, err error) {
msg := err.Error()
if strings.Contains(msg, msgMethodUnsupported) {
c.Result.Set(request.IDResponseErrorsMethodNotAllowed,
http.StatusMethodNotAllowed,
msgMethodUnsupported,
authErrMsg(c, msg, msgMethodUnsupported),
err)
return
}
c.Result.Set(request.IDResponseErrorsInvalidQuery,
http.StatusBadRequest,
msgInvalidQuery,
authErrMsg(c, msg, msgInvalidQuery),
err)
}
func authErrMsg(c *request.Context, fullMsg, shortMsg string) string {
if c.Authentication.Method != auth.MethodAnonymous {
return fullMsg
}
return shortMsg
}
func ifNoneMatch(c *request.Context) string {
if h := c.Request.Header.Get(headers.IfNoneMatch); h != "" {
return strings.Replace(h, "\"", "", -1)
}
return c.Request.URL.Query().Get(agentcfg.Etag)
}