receiver/elasticapmreceiver/receiver.go (289 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package elasticapmreceiver // import "github.com/elastic/opentelemetry-collector-components/receiver/elasticapmreceiver" import ( "context" "encoding/json" "errors" "fmt" "hash" "net" "net/http" "sync" "time" "github.com/cespare/xxhash" "github.com/elastic/apm-data/input/elasticapm" "github.com/elastic/apm-data/model/modelpb" "github.com/elastic/apm-data/model/modelprocessor" "github.com/elastic/opentelemetry-collector-components/receiver/elasticapmreceiver/internal/mappers" "github.com/elastic/opentelemetry-lib/agentcfg" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receiverhelper" "go.uber.org/zap" "golang.org/x/sync/semaphore" ) // TODO report different formats for intakev2 and rumv3? const dataFormatElasticAPM = "elasticapm" const ( agentConfigPath = "/config/v1/agents" intakeV2EventsPath = "/intake/v2/events" ) type agentCfgFetcherFactory = func(context.Context, component.Host) (agentcfg.Fetcher, error) // elasticAPMReceiver implements support for receiving Logs, Metrics, and Traces from Elastic APM agents. type elasticAPMReceiver struct { cfg *Config obsreport *receiverhelper.ObsReport settings receiver.Settings nextTraces consumer.Traces nextMetrics consumer.Metrics nextLogs consumer.Logs httpServer *http.Server shutdownWG sync.WaitGroup fetcherFactory agentCfgFetcherFactory cancelFn context.CancelFunc } // newElasticAPMReceiver just creates the OpenTelemetry receiver services. It is the caller's // responsibility to invoke the respective Start*Reception methods as well // as the various Stop*Reception methods to end it. func newElasticAPMReceiver(fetcher agentCfgFetcherFactory, cfg *Config, set receiver.Settings) (*elasticAPMReceiver, error) { obsreport, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ ReceiverID: set.ID, Transport: "http", ReceiverCreateSettings: set, }) if err != nil { return nil, err } return &elasticAPMReceiver{ cfg: cfg, settings: set, obsreport: obsreport, fetcherFactory: fetcher, }, nil } // Start runs an HTTP server for receiving data from Elastic APM agents. func (r *elasticAPMReceiver) Start(ctx context.Context, host component.Host) error { ctx, r.cancelFn = context.WithCancel(ctx) if err := r.startHTTPServer(ctx, host); err != nil { return errors.Join(err, r.Shutdown(ctx)) } return nil } func (r *elasticAPMReceiver) startHTTPServer(ctx context.Context, host component.Host) error { httpMux := http.NewServeMux() httpMux.HandleFunc(intakeV2EventsPath, r.newElasticAPMEventsHandler()) httpMux.HandleFunc(agentConfigPath, r.newElasticAPMConfigsHandler(ctx, host)) // TODO rum v2, v3 var err error if r.httpServer, err = r.cfg.ToServer( ctx, host, r.settings.TelemetrySettings, httpMux, confighttp.WithErrorHandler(errorHandler), ); err != nil { return err } r.settings.Logger.Info("Starting HTTP server", zap.String("endpoint", r.cfg.ServerConfig.Endpoint)) var hln net.Listener if hln, err = r.cfg.ServerConfig.ToListener(ctx); err != nil { return err } r.shutdownWG.Add(1) go func() { defer r.shutdownWG.Done() if errHTTP := r.httpServer.Serve(hln); errHTTP != nil && !errors.Is(errHTTP, http.ErrServerClosed) { componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(errHTTP)) } }() return nil } // Shutdown is a method to turn off receiving. func (r *elasticAPMReceiver) Shutdown(ctx context.Context) error { var err error if r.cancelFn != nil { r.cancelFn() } if r.httpServer != nil { err = r.httpServer.Shutdown(ctx) } r.shutdownWG.Wait() return err } func errorHandler(w http.ResponseWriter, r *http.Request, errMsg string, statusCode int) { // TODO } func (r *elasticAPMReceiver) newElasticAPMEventsHandler() http.HandlerFunc { var ( // TODO make semaphore size configurable and/or find a different way // to limit concurrency that fits better with OTel Collector. sem = semaphore.NewWeighted(100) // TODO make event size configurable maxEventSize = 1024 * 1024 // 1MiB // TODO make batch size configurable? batchSize = 10 ) batchProcessor := modelpb.ProcessBatchFunc(r.processBatch) elasticapmProcessor := elasticapm.NewProcessor(elasticapm.Config{ Logger: r.settings.Logger, MaxEventSize: maxEventSize, Semaphore: sem, }) return func(w http.ResponseWriter, r *http.Request) { statusCode := http.StatusAccepted var elasticapmResult elasticapm.Result baseEvent := &modelpb.APMEvent{} baseEvent.Event = &modelpb.Event{} streamErr := elasticapmProcessor.HandleStream( r.Context(), baseEvent, r.Body, batchSize, batchProcessor, &elasticapmResult, ) _ = streamErr // TODO record metrics about errors? var result struct { Accepted int `json:"accepted"` Errors []jsonError `json:"errors,omitempty"` } result.Accepted = elasticapmResult.Accepted // TODO process elasticapmResult.Errors, add to result // TODO process streamErr, conditionally add to result // TODO process r.Context().Err(), conditionally add to result w.WriteHeader(statusCode) _ = json.NewEncoder(w).Encode(&result) } } func (r *elasticAPMReceiver) processBatch(ctx context.Context, batch *modelpb.Batch) error { ld := plog.NewLogs() md := pmetric.NewMetrics() td := ptrace.NewTraces() gk := modelprocessor.SetGroupingKey{ NewHash: func() hash.Hash { return xxhash.New() }, } if err := gk.ProcessBatch(ctx, batch); err != nil { r.settings.Logger.Error("failed to process batch", zap.Error(err)) } for _, event := range *batch { timestampNanos := event.GetTimestamp() timestamp := time.Unix( int64(timestampNanos/1e9), // Convert nanoseconds to seconds int64(timestampNanos%1e9), // Remainder in nanoseconds ) // TODO record metrics about events processed by type? // TODO translate events to pdata types switch event.Type() { case modelpb.MetricEventType: rm := md.ResourceMetrics().AppendEmpty() sm := rm.ScopeMetrics().AppendEmpty() metricset := event.GetMetricset() // TODO interval, doc_count // TODO how can we attach metricset.name? for _, sample := range metricset.GetSamples() { m := sm.Metrics().AppendEmpty() m.SetName(sample.GetName()) m.SetUnit(sample.GetUnit()) // TODO set attributes (dimensions/labels) switch sample.GetType() { case modelpb.MetricType_METRIC_TYPE_COUNTER: dp := m.SetEmptySum().DataPoints().AppendEmpty() dp.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) dp.SetDoubleValue(sample.GetValue()) case modelpb.MetricType_METRIC_TYPE_GAUGE: dp := m.SetEmptyGauge().DataPoints().AppendEmpty() dp.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) dp.SetDoubleValue(sample.GetValue()) case modelpb.MetricType_METRIC_TYPE_HISTOGRAM: // TODO histograms case modelpb.MetricType_METRIC_TYPE_SUMMARY: // TODO summaries default: return fmt.Errorf("unhandled metric type %q", sample.GetType()) } } case modelpb.ErrorEventType: rl := ld.ResourceLogs().AppendEmpty() r.elasticErrorToOtelLogRecord(&rl, event, timestamp, ctx) case modelpb.LogEventType: // TODO case modelpb.SpanEventType, modelpb.TransactionEventType: rs := td.ResourceSpans().AppendEmpty() s := r.elasticEventToOtelSpan(&rs, event, timestamp) isTransaction := event.Type() == modelpb.TransactionEventType if isTransaction { r.elasticTransactionToOtelSpan(&s, event) } else { r.elasticSpanToOTelSpan(&s, event) } default: return fmt.Errorf("unhandled event type %q", event.Type()) } } var errs []error if numRecords := ld.LogRecordCount(); numRecords != 0 { ctx := r.obsreport.StartLogsOp(ctx) err := r.nextLogs.ConsumeLogs(ctx, ld) r.obsreport.EndLogsOp(ctx, dataFormatElasticAPM, numRecords, err) errs = append(errs, err) } if numDataPoints := md.DataPointCount(); numDataPoints != 0 { ctx := r.obsreport.StartMetricsOp(ctx) err := r.nextMetrics.ConsumeMetrics(ctx, md) r.obsreport.EndMetricsOp(ctx, dataFormatElasticAPM, numDataPoints, err) errs = append(errs, err) } if numSpans := td.SpanCount(); numSpans != 0 { ctx := r.obsreport.StartTracesOp(ctx) err := r.nextTraces.ConsumeTraces(ctx, td) r.obsreport.EndTracesOp(ctx, dataFormatElasticAPM, numSpans, err) errs = append(errs, err) } return errors.Join(errs...) } func (r *elasticAPMReceiver) elasticErrorToOtelLogRecord(rl *plog.ResourceLogs, event *modelpb.APMEvent, timestamp time.Time, ctx context.Context) { sl := rl.ScopeLogs().AppendEmpty() l := sl.LogRecords().AppendEmpty() mappers.SetTopLevelFieldsLogRecord(event, timestamp, l, r.settings.Logger) mappers.SetDerivedFieldsForError(event, l.Attributes()) mappers.SetDerivedResourceAttributes(event, rl.Resource().Attributes()) mappers.TranslateToOtelResourceAttributes(event, rl.Resource().Attributes()) if event.Error != nil && event.Error.Log != nil { l.Body().SetStr(event.Error.Log.Message) } } func (r *elasticAPMReceiver) elasticEventToOtelSpan(rs *ptrace.ResourceSpans, event *modelpb.APMEvent, timestamp time.Time) ptrace.Span { ss := rs.ScopeSpans().AppendEmpty() s := ss.Spans().AppendEmpty() mappers.SetTopLevelFieldsSpan(event, timestamp, s, r.settings.Logger) mappers.TranslateToOtelResourceAttributes(event, rs.Resource().Attributes()) mappers.SetDerivedFieldsCommon(event, s.Attributes()) mappers.SetDerivedResourceAttributes(event, rs.Resource().Attributes()) r.elasticSpanLinksToOTelSpanLinks(event, s) return s } func (r *elasticAPMReceiver) elasticSpanLinksToOTelSpanLinks(event *modelpb.APMEvent, s ptrace.Span) { if event.Span != nil && event.Span.Links != nil { for _, link := range event.Span.Links { ptraceSpanLink := s.Links().AppendEmpty() traceId, err := mappers.TraceIDFromHex(link.TraceId) if err == nil { ptraceSpanLink.SetTraceID(traceId) } else { r.settings.Logger.Error("failed to parse trace ID from span link", zap.String("trace_id", link.TraceId)) } spanId, err := mappers.SpanIdFromHex(link.SpanId) if err == nil { ptraceSpanLink.SetSpanID(spanId) } else { r.settings.Logger.Error("failed to parse span ID from span link", zap.String("span_id", link.SpanId)) } } } } func (r *elasticAPMReceiver) elasticTransactionToOtelSpan(s *ptrace.Span, event *modelpb.APMEvent) { s.SetName(event.Transaction.Name) mappers.SetDerivedFieldsForTransaction(event, s.Attributes()) transaction := event.GetTransaction() s.SetName(transaction.GetName()) mappers.TranslateIntakeV2TransactionToOTelAttributes(event, s.Attributes()) if event.Http != nil && event.Http.Request != nil { s.SetKind(ptrace.SpanKindServer) } else if event.Message != "" { // this check is TBD s.SetKind(ptrace.SpanKindConsumer) } } func (r *elasticAPMReceiver) elasticSpanToOTelSpan(s *ptrace.Span, event *modelpb.APMEvent) { span := event.GetSpan() s.SetName(span.GetName()) mappers.SetDerivedFieldsForSpan(event, s.Attributes()) mappers.TranslateIntakeV2SpanToOTelAttributes(event, s.Attributes()) if event.Http != nil || event.Message != "" { s.SetKind(ptrace.SpanKindClient) } } type jsonError struct { Message string `json:"message"` Document string `json:"document,omitempty"` }