receiver/datadogreceiver/receiver.go (400 lines of code) (raw):
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package datadogreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver"
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strings"
"github.com/DataDog/agent-payload/v5/gogen"
pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace"
"github.com/tinylib/msgp/msgp"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componentstatus"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receiverhelper"
"go.uber.org/zap"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/errorutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver/internal/translator"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver/internal/translator/header"
)
type datadogReceiver struct {
address string
config *Config
params receiver.Settings
nextTracesConsumer consumer.Traces
nextMetricsConsumer consumer.Metrics
metricsTranslator *translator.MetricsTranslator
statsTranslator *translator.StatsTranslator
server *http.Server
tReceiver *receiverhelper.ObsReport
}
// Endpoint specifies an API endpoint definition.
type Endpoint struct {
// Pattern specifies the API pattern, as registered by the HTTP handler.
Pattern string
// Handler specifies the http.Handler for this endpoint.
Handler func(http.ResponseWriter, *http.Request)
}
// getEndpoints specifies the list of endpoints registered for the trace-agent API.
func (ddr *datadogReceiver) getEndpoints() []Endpoint {
endpoints := []Endpoint{
{
Pattern: "/",
Handler: func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
},
},
}
if ddr.nextTracesConsumer != nil {
endpoints = append(endpoints, []Endpoint{
{
Pattern: "/v0.3/traces",
Handler: ddr.handleTraces,
},
{
Pattern: "/v0.4/traces",
Handler: ddr.handleTraces,
},
{
Pattern: "/v0.5/traces",
Handler: ddr.handleTraces,
},
{
Pattern: "/v0.7/traces",
Handler: ddr.handleTraces,
},
{
Pattern: "/api/v0.2/traces",
Handler: ddr.handleTraces,
},
}...)
}
if ddr.nextMetricsConsumer != nil {
endpoints = append(endpoints, []Endpoint{
{
Pattern: "/api/v1/series",
Handler: ddr.handleV1Series,
},
{
Pattern: "/api/v2/series",
Handler: ddr.handleV2Series,
},
{
Pattern: "/api/v1/check_run",
Handler: ddr.handleCheckRun,
},
{
Pattern: "/api/v1/sketches",
Handler: ddr.handleSketches,
},
{
Pattern: "/api/beta/sketches",
Handler: ddr.handleSketches,
},
{
Pattern: "/intake",
Handler: ddr.handleIntake,
},
{
Pattern: "/api/v1/distribution_points",
Handler: ddr.handleDistributionPoints,
},
{
Pattern: "/v0.6/stats",
Handler: ddr.handleStats,
},
}...)
}
infoResponse, _ := ddr.buildInfoResponse(endpoints)
endpoints = append(endpoints, Endpoint{
Pattern: "/info",
Handler: func(w http.ResponseWriter, r *http.Request) { ddr.handleInfo(w, r, infoResponse) },
})
return endpoints
}
func newDataDogReceiver(config *Config, params receiver.Settings) (component.Component, error) {
instance, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{LongLivedCtx: false, ReceiverID: params.ID, Transport: "http", ReceiverCreateSettings: params})
if err != nil {
return nil, err
}
return &datadogReceiver{
params: params,
config: config,
server: &http.Server{
ReadTimeout: config.ReadTimeout,
},
tReceiver: instance,
metricsTranslator: translator.NewMetricsTranslator(params.BuildInfo),
statsTranslator: translator.NewStatsTranslator(),
}, nil
}
func (ddr *datadogReceiver) Start(ctx context.Context, host component.Host) error {
ddmux := http.NewServeMux()
endpoints := ddr.getEndpoints()
for _, e := range endpoints {
ddmux.HandleFunc(e.Pattern, e.Handler)
}
var err error
ddr.server, err = ddr.config.ToServer(
ctx,
host,
ddr.params.TelemetrySettings,
ddmux,
)
if err != nil {
return fmt.Errorf("failed to create server definition: %w", err)
}
hln, err := ddr.config.ToListener(ctx)
if err != nil {
return fmt.Errorf("failed to create datadog listener: %w", err)
}
ddr.address = hln.Addr().String()
go func() {
if err := ddr.server.Serve(hln); err != nil && !errors.Is(err, http.ErrServerClosed) {
componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(fmt.Errorf("error starting datadog receiver: %w", err)))
}
}()
return nil
}
func (ddr *datadogReceiver) Shutdown(ctx context.Context) (err error) {
return ddr.server.Shutdown(ctx)
}
func (ddr *datadogReceiver) buildInfoResponse(endpoints []Endpoint) ([]byte, error) {
var endpointPaths []string
for _, e := range endpoints {
endpointPaths = append(endpointPaths, e.Pattern)
}
return json.MarshalIndent(translator.DDInfo{
Version: fmt.Sprintf("datadogreceiver-%s-%s", ddr.params.BuildInfo.Command, ddr.params.BuildInfo.Version),
Endpoints: endpointPaths,
ClientDropP0s: false,
SpanMetaStructs: false,
LongRunningSpans: false,
}, "", "\t")
}
// handleInfo handles incoming /info payloads.
func (ddr *datadogReceiver) handleInfo(w http.ResponseWriter, _ *http.Request, infoResponse []byte) {
_, err := fmt.Fprintf(w, "%s", infoResponse)
if err != nil {
ddr.params.Logger.Error("Error writing /info endpoint response", zap.Error(err))
http.Error(w, "Error writing /info endpoint response", http.StatusInternalServerError)
return
}
}
func (ddr *datadogReceiver) handleTraces(w http.ResponseWriter, req *http.Request) {
if req.ContentLength == 0 { // Ping mechanism of Datadog SDK perform http request with empty body when GET /info not implemented.
http.Error(w, "Fake featuresdiscovery", http.StatusBadRequest) // The response code should be different of 404 to be considered ok by Datadog SDK.
return
}
obsCtx := ddr.tReceiver.StartTracesOp(req.Context())
var err error
var spanCount int
defer func(spanCount *int) {
ddr.tReceiver.EndTracesOp(obsCtx, "datadog", *spanCount, err)
}(&spanCount)
var ddTraces []*pb.TracerPayload
ddTraces, err = translator.HandleTracesPayload(req)
if err != nil {
http.Error(w, "Unable to unmarshal reqs", http.StatusBadRequest)
ddr.params.Logger.Error("Unable to unmarshal reqs", zap.Error(err))
return
}
for _, ddTrace := range ddTraces {
otelTraces := translator.ToTraces(ddTrace, req)
spanCount = otelTraces.SpanCount()
err = ddr.nextTracesConsumer.ConsumeTraces(obsCtx, otelTraces)
if err != nil {
errorutil.HTTPError(w, err)
ddr.params.Logger.Error("Trace consumer errored out", zap.Error(err))
return
}
}
responseBody := "OK"
contentType := "text/plain"
urlSplit := strings.Split(req.RequestURI, "/")
if len(urlSplit) == 3 {
// Match the response logic from dd-agent https://github.com/DataDog/datadog-agent/blob/86b2ae24f93941447a5bf0a2b6419caed77e76dd/pkg/trace/api/api.go#L511-L519
switch version := urlSplit[1]; version {
case "v0.1", "v0.2", "v0.3":
// Keep the "OK" response for these versions
default:
contentType = "application/json"
responseBody = "{}"
}
}
w.Header().Set("Content-Type", contentType)
_, _ = w.Write([]byte(responseBody))
}
// handleV1Series handles the v1 series endpoint https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
func (ddr *datadogReceiver) handleV1Series(w http.ResponseWriter, req *http.Request) {
obsCtx := ddr.tReceiver.StartMetricsOp(req.Context())
var err error
var metricsCount int
defer func(metricsCount *int) {
ddr.tReceiver.EndMetricsOp(obsCtx, "datadog", *metricsCount, err)
}(&metricsCount)
buf := translator.GetBuffer()
defer translator.PutBuffer(buf)
if _, err = io.Copy(buf, req.Body); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
ddr.params.Logger.Error(err.Error())
return
}
seriesList := translator.SeriesList{}
err = json.Unmarshal(buf.Bytes(), &seriesList)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
ddr.params.Logger.Error(err.Error())
return
}
metrics := ddr.metricsTranslator.TranslateSeriesV1(seriesList)
metricsCount = metrics.DataPointCount()
err = ddr.nextMetricsConsumer.ConsumeMetrics(obsCtx, metrics)
if err != nil {
errorutil.HTTPError(w, err)
ddr.params.Logger.Error("metrics consumer errored out", zap.Error(err))
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusAccepted)
response := map[string]string{
"status": "ok",
}
_ = json.NewEncoder(w).Encode(response)
}
// handleV2Series handles the v2 series endpoint https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
func (ddr *datadogReceiver) handleV2Series(w http.ResponseWriter, req *http.Request) {
obsCtx := ddr.tReceiver.StartMetricsOp(req.Context())
var err error
var metricsCount int
defer func(metricsCount *int) {
ddr.tReceiver.EndMetricsOp(obsCtx, "datadog", *metricsCount, err)
}(&metricsCount)
series, err := ddr.metricsTranslator.HandleSeriesV2Payload(req)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
ddr.params.Logger.Error(err.Error())
return
}
metrics := ddr.metricsTranslator.TranslateSeriesV2(series)
metricsCount = metrics.DataPointCount()
err = ddr.nextMetricsConsumer.ConsumeMetrics(obsCtx, metrics)
if err != nil {
errorutil.HTTPError(w, err)
ddr.params.Logger.Error("metrics consumer errored out", zap.Error(err))
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusAccepted)
response := map[string]any{
"errors": []string{},
}
_ = json.NewEncoder(w).Encode(response)
}
// handleCheckRun handles the service checks endpoint https://docs.datadoghq.com/api/latest/service-checks/
func (ddr *datadogReceiver) handleCheckRun(w http.ResponseWriter, req *http.Request) {
obsCtx := ddr.tReceiver.StartMetricsOp(req.Context())
var err error
var metricsCount int
defer func(metricsCount *int) {
ddr.tReceiver.EndMetricsOp(obsCtx, "datadog", *metricsCount, err)
}(&metricsCount)
buf := translator.GetBuffer()
defer translator.PutBuffer(buf)
if _, err = io.Copy(buf, req.Body); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
ddr.params.Logger.Error(err.Error())
return
}
var services []translator.ServiceCheck
err = json.Unmarshal(buf.Bytes(), &services)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
ddr.params.Logger.Error(err.Error())
return
}
metrics := ddr.metricsTranslator.TranslateServices(services)
metricsCount = metrics.DataPointCount()
err = ddr.nextMetricsConsumer.ConsumeMetrics(obsCtx, metrics)
if err != nil {
errorutil.HTTPError(w, err)
ddr.params.Logger.Error("metrics consumer errored out", zap.Error(err))
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusAccepted)
response := map[string]string{
"status": "ok",
}
_ = json.NewEncoder(w).Encode(response)
}
// handleSketches handles sketches, the underlying data structure of distributions https://docs.datadoghq.com/metrics/distributions/
func (ddr *datadogReceiver) handleSketches(w http.ResponseWriter, req *http.Request) {
obsCtx := ddr.tReceiver.StartMetricsOp(req.Context())
var err error
var metricsCount int
defer func(metricsCount *int) {
ddr.tReceiver.EndMetricsOp(obsCtx, "datadog", *metricsCount, err)
}(&metricsCount)
var ddSketches []gogen.SketchPayload_Sketch
ddSketches, err = ddr.metricsTranslator.HandleSketchesPayload(req)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
ddr.params.Logger.Error(err.Error())
return
}
metrics := ddr.metricsTranslator.TranslateSketches(ddSketches)
metricsCount = metrics.DataPointCount()
err = ddr.nextMetricsConsumer.ConsumeMetrics(obsCtx, metrics)
if err != nil {
errorutil.HTTPError(w, err)
ddr.params.Logger.Error("metrics consumer errored out", zap.Error(err))
return
}
w.WriteHeader(http.StatusAccepted)
_, _ = w.Write([]byte("OK"))
}
// handleIntake handles operational calls made by the agent to submit host tags and other metadata to the backend.
func (ddr *datadogReceiver) handleIntake(w http.ResponseWriter, req *http.Request) {
obsCtx := ddr.tReceiver.StartMetricsOp(req.Context())
var err error
var metricsCount int
defer func(metricsCount *int) {
ddr.tReceiver.EndMetricsOp(obsCtx, "datadog", *metricsCount, err)
}(&metricsCount)
err = errors.New("intake endpoint not implemented")
http.Error(w, err.Error(), http.StatusMethodNotAllowed)
ddr.params.Logger.Warn("metrics consumer errored out", zap.Error(err))
}
// handleDistributionPoints handles the distribution points endpoint https://docs.datadoghq.com/api/latest/metrics/#submit-distribution-points
func (ddr *datadogReceiver) handleDistributionPoints(w http.ResponseWriter, req *http.Request) {
obsCtx := ddr.tReceiver.StartMetricsOp(req.Context())
var err error
var metricsCount int
defer func(metricsCount *int) {
ddr.tReceiver.EndMetricsOp(obsCtx, "datadog", *metricsCount, err)
}(&metricsCount)
err = errors.New("distribution points endpoint not implemented")
http.Error(w, err.Error(), http.StatusMethodNotAllowed)
ddr.params.Logger.Warn("metrics consumer errored out", zap.Error(err))
}
// handleStats handles incoming stats payloads.
func (ddr *datadogReceiver) handleStats(w http.ResponseWriter, req *http.Request) {
obsCtx := ddr.tReceiver.StartMetricsOp(req.Context())
var err error
metricsCount := 0
defer func(metricsCount *int) {
ddr.tReceiver.EndMetricsOp(obsCtx, "datadog", *metricsCount, err)
}(&metricsCount)
req.Header.Set("Accept", "application/msgpack")
clientStats := &pb.ClientStatsPayload{}
err = msgp.Decode(req.Body, clientStats)
if err != nil {
ddr.params.Logger.Error("Error decoding pb.ClientStatsPayload", zap.Error(err))
http.Error(w, "Error decoding pb.ClientStatsPayload", http.StatusBadRequest)
return
}
metrics, err := ddr.statsTranslator.TranslateStats(clientStats, req.Header.Get(header.Lang), req.Header.Get(header.TracerVersion))
if err != nil {
ddr.params.Logger.Error("Error translating stats", zap.Error(err))
http.Error(w, "Error translating stats", http.StatusBadRequest)
return
}
metricsCount = metrics.DataPointCount()
err = ddr.nextMetricsConsumer.ConsumeMetrics(obsCtx, metrics)
if err != nil {
ddr.params.Logger.Error("Metrics consumer errored out", zap.Error(err))
errorutil.HTTPError(w, err)
return
}
_, _ = w.Write([]byte("OK"))
}