internal/beater/api/intake/handler.go (168 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package intake
import (
"context"
"errors"
"fmt"
"net/http"
"strings"
"go.opentelemetry.io/otel/metric"
"github.com/elastic/apm-data/input/elasticapm"
"github.com/elastic/apm-data/model/modelpb"
"github.com/elastic/apm-server/internal/beater/auth"
"github.com/elastic/apm-server/internal/beater/headers"
"github.com/elastic/apm-server/internal/beater/ratelimit"
"github.com/elastic/apm-server/internal/beater/request"
"github.com/elastic/apm-server/internal/publish"
)
const (
batchSize = 10
)
var (
errMethodNotAllowed = errors.New("only POST requests are supported")
errServerShuttingDown = errors.New("server is shutting down")
errInvalidContentType = errors.New("invalid content type")
)
// RequestMetadataFunc is a function type supplied to Handler for extracting
// metadata from the request. This is used for conditionally injecting the
// source IP address as `client.ip` for RUM.
type RequestMetadataFunc func(*request.Context) *modelpb.APMEvent
// Handler returns a request.Handler for managing intake requests for backend and rum events.
func Handler(mp metric.MeterProvider, handler elasticapm.StreamHandler, requestMetadataFunc RequestMetadataFunc, batchProcessor modelpb.BatchProcessor) request.Handler {
meter := mp.Meter("github.com/elastic/apm-server/internal/beater/api/intake")
eventsAccepted, _ := meter.Int64Counter("apm-server.processor.stream.accepted")
eventsInvalid, _ := meter.Int64Counter("apm-server.processor.stream.errors.invalid")
eventsTooLarge, _ := meter.Int64Counter("apm-server.processor.stream.errors.toolarge")
return func(c *request.Context) {
if err := validateRequest(c); err != nil {
writeError(c, err)
return
}
// If there was an error decoding the body, then it Result.Err
// will already be set. Reformat the error response.
if c.Result.Err != nil {
writeError(c, compressedRequestReaderError{c.Result.Err})
return
}
var result elasticapm.Result
err := handler.HandleStream(
c.Request.Context(),
requestMetadataFunc(c),
c.Request.Body,
batchSize,
batchProcessor,
&result,
)
eventsAccepted.Add(context.Background(), int64(result.Accepted))
eventsInvalid.Add(context.Background(), int64(result.Invalid))
eventsTooLarge.Add(context.Background(), int64(result.TooLarge))
writeStreamResult(c, result, err)
}
}
func validateRequest(c *request.Context) error {
if c.Request.Method != http.MethodPost {
return errMethodNotAllowed
}
// Content-Type, if specified, must contain "application/x-ndjson". If unspecified, we assume this.
contentType := c.Request.Header.Get(headers.ContentType)
if contentType != "" && !strings.Contains(contentType, "application/x-ndjson") {
return fmt.Errorf("%w: '%s'", errInvalidContentType, contentType)
}
return nil
}
func writeError(c *request.Context, err error) {
writeStreamResult(c, elasticapm.Result{}, err)
}
func writeStreamResult(c *request.Context, streamResult elasticapm.Result, streamErr error) {
statusCode := http.StatusAccepted
id := request.IDResponseValidAccepted
jsonResult := jsonResult{Accepted: streamResult.Accepted}
var errorMessages []string
if n := len(streamResult.Errors); n > 0 {
if streamErr != nil {
n++
}
jsonResult.Errors = make([]jsonError, 0, n)
errorMessages = make([]string, 0, n)
}
processError := func(err error) {
errID, jsonErr := processStreamError(err)
errStatusCode := errStatusCode(errID)
jsonResult.Errors = append(jsonResult.Errors, jsonErr)
errorMessages = append(errorMessages, jsonErr.Message)
if errStatusCode > statusCode {
statusCode = errStatusCode
id = errID
}
}
for _, err := range streamResult.Errors {
processError(err)
}
if streamErr != nil {
processError(streamErr)
}
var err error
if len(errorMessages) > 0 {
err = errors.New(strings.Join(errorMessages, ", "))
}
writeResult(c, id, statusCode, &jsonResult, err)
}
func processStreamError(err error) (request.ResultID, jsonError) {
errID := request.IDResponseErrorsInternal
var invalidInput *elasticapm.InvalidInputError
if errors.As(err, &invalidInput) {
if invalidInput.TooLarge {
errID = request.IDResponseErrorsRequestTooLarge
} else {
errID = request.IDResponseErrorsValidate
}
return errID, jsonError{
Message: invalidInput.Message,
Document: invalidInput.Document,
}
}
if errors.As(err, &compressedRequestReaderError{}) {
errID = request.IDResponseErrorsValidate
} else {
switch {
case errors.Is(err, publish.ErrChannelClosed):
errID = request.IDResponseErrorsShuttingDown
err = errServerShuttingDown
case errors.Is(err, publish.ErrFull):
errID = request.IDResponseErrorsFullQueue
case errors.Is(err, errMethodNotAllowed):
errID = request.IDResponseErrorsMethodNotAllowed
case errors.Is(err, errInvalidContentType):
errID = request.IDResponseErrorsValidate
case errors.Is(err, ratelimit.ErrRateLimitExceeded):
errID = request.IDResponseErrorsRateLimit
case errors.Is(err, auth.ErrUnauthorized):
errID = request.IDResponseErrorsForbidden
}
}
return errID, jsonError{Message: err.Error()}
}
func errStatusCode(errID request.ResultID) int {
switch errID {
case request.IDResponseErrorsMethodNotAllowed:
// TODO: remove exception case and use StatusMethodNotAllowed (breaking bugfix)
return http.StatusBadRequest
case request.IDResponseErrorsRequestTooLarge:
// TODO: remove exception case and use StatusRequestEntityTooLarge (breaking bugfix)
return http.StatusBadRequest
default:
return request.MapResultIDToStatus[errID].Code
}
}
func writeResult(c *request.Context, id request.ResultID, statusCode int, result *jsonResult, err error) {
var body interface{}
if statusCode >= http.StatusBadRequest {
// this signals to the client that we're closing the connection
// but also signals to http.Server that it should close it:
// https://golang.org/src/net/http/server.go#L1254
c.ResponseWriter.Header().Add(headers.Connection, "Close")
body = result
} else if _, ok := c.Request.URL.Query()["verbose"]; ok {
body = result
}
c.Result.Set(id, statusCode, request.MapResultIDToStatus[id].Keyword, body, err)
c.WriteResult()
}
type compressedRequestReaderError struct {
error
}
type jsonResult struct {
Accepted int `json:"accepted"`
Errors []jsonError `json:"errors,omitempty"`
}
type jsonError struct {
Message string `json:"message"`
Document string `json:"document,omitempty"`
}