receiver/otelarrowreceiver/internal/arrow/arrow.go (564 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package arrow // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/arrow" import ( "context" "errors" "io" "runtime" "strings" "sync" "sync/atomic" arrowpb "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1" arrowRecord "github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record" "go.opentelemetry.io/collector/client" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/extension/extensionauth" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receiverhelper" "go.opentelemetry.io/otel" otelcodes "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/trace" "go.uber.org/multierr" "go.uber.org/zap" "golang.org/x/net/http2/hpack" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/grpcutil" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission2" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/netstats" ) const ( streamFormat = "arrow" hpackMaxDynamicSize = 4096 ) var ( ErrNoMetricsConsumer = errors.New("no metrics consumer") ErrNoLogsConsumer = errors.New("no logs consumer") ErrNoTracesConsumer = errors.New("no traces consumer") ErrUnrecognizedPayload = consumererror.NewPermanent(errors.New("unrecognized OTel-Arrow payload")) ) type Consumers interface { Traces() consumer.Traces Metrics() consumer.Metrics Logs() consumer.Logs } type Receiver struct { Consumers arrowpb.UnsafeArrowTracesServiceServer arrowpb.UnsafeArrowLogsServiceServer arrowpb.UnsafeArrowMetricsServiceServer telemetry component.TelemetrySettings tracer trace.Tracer obsrecv *receiverhelper.ObsReport gsettings configgrpc.ServerConfig authServer extensionauth.Server newConsumer func() arrowRecord.ConsumerAPI netReporter netstats.Interface boundedQueue admission2.Queue } // receiverStream holds the inFlightWG for a single stream. type receiverStream struct { *Receiver inFlightWG sync.WaitGroup } // New creates a new Receiver reference. func New( cs Consumers, set receiver.Settings, obsrecv *receiverhelper.ObsReport, gsettings configgrpc.ServerConfig, authServer extensionauth.Server, newConsumer func() arrowRecord.ConsumerAPI, bq admission2.Queue, netReporter netstats.Interface, ) (*Receiver, error) { tracer := set.TracerProvider.Tracer("otel-arrow-receiver") return &Receiver{ Consumers: cs, obsrecv: obsrecv, telemetry: set.TelemetrySettings, tracer: tracer, authServer: authServer, newConsumer: newConsumer, gsettings: gsettings, netReporter: netReporter, boundedQueue: bq, }, nil } // headerReceiver contains the state necessary to decode per-request metadata // from an arrow stream. type headerReceiver struct { // decoder maintains state across the stream. decoder *hpack.Decoder // includeMetadata as configured by gRPC settings. includeMetadata bool // hasAuthServer indicates that headers must be produced // independent of includeMetadata. hasAuthServer bool // client connection info from the stream context, (optionally // if includeMetadata) to be extended with per-request metadata. connInfo client.Info // streamHdrs was translated from the incoming context, will be // merged with per-request metadata. Note that the contents of // this map are equivalent to connInfo.Metadata, however that // library does not let us iterate over the map so we recalculate // this from the gRPC incoming stream context. streamHdrs map[string][]string // tmpHdrs is used by the decoder's emit function during Write. tmpHdrs map[string][]string } func newHeaderReceiver(streamCtx context.Context, as extensionauth.Server, includeMetadata bool) *headerReceiver { hr := &headerReceiver{ includeMetadata: includeMetadata, hasAuthServer: as != nil, connInfo: client.FromContext(streamCtx), } // Note that we capture the incoming context if there is an // Auth plugin configured or includeMetadata is set. if hr.includeMetadata || hr.hasAuthServer { if smd, ok := metadata.FromIncomingContext(streamCtx); ok { hr.streamHdrs = smd } } // Note the hpack decoder supports additional protections, // such as SetMaxStringLength(), but as we already have limits // on stream request size, this seems unnecessary. hr.decoder = hpack.NewDecoder(hpackMaxDynamicSize, hr.tmpHdrsAppend) return hr } // combineHeaders calculates per-request Metadata by combining the stream's // client.Info with additional key:values associated with the arrow batch. func (h *headerReceiver) combineHeaders(ctx context.Context, hdrsBytes []byte) (context.Context, map[string][]string, error) { if len(hdrsBytes) == 0 && len(h.streamHdrs) == 0 { // Note: call newContext in this case to ensure that // connInfo is added to the context, for Auth. return h.newContext(ctx, nil), nil, nil } if len(hdrsBytes) == 0 { return h.newContext(ctx, h.streamHdrs), h.streamHdrs, nil } // Note that we will parse the headers even if they are not // used, to check for validity and/or trace context. Also // note this code was once optimized to avoid the following // map allocation in cases where the return value would not be // used. This logic was "is metadata present" or "is auth // server used". Then we added to this, "is trace propagation // in use" and simplified this function to always store the // headers into a temporary map. h.tmpHdrs = map[string][]string{} // Write calls the emitFunc, appending directly into `tmpHdrs`. if _, err := h.decoder.Write(hdrsBytes); err != nil { return ctx, nil, err } // Get the global propagator, to extract context. When there // are no fields, it's a no-op propagator implementation and // we can skip the allocations inside this block. carrier := otel.GetTextMapPropagator() if len(carrier.Fields()) != 0 { // When there are no fields, it's a no-op // implementation and we can skip the allocations. flat := map[string]string{} for _, key := range carrier.Fields() { have := h.tmpHdrs[key] if len(have) > 0 { flat[key] = have[0] delete(h.tmpHdrs, key) } } ctx = carrier.Extract(ctx, propagation.MapCarrier(flat)) } // Add streamHdrs that were not carried in the per-request headers. for k, v := range h.streamHdrs { // Note: This is done after the per-request metadata is defined // in recognition of a potential for duplicated values stemming // from the Arrow exporter's independent call to the Auth // extension's GetRequestMetadata(). This paired with the // headersetter's return of empty-string values means, we would // end up with an empty-string element for any headersetter // `from_context` rules b/c the stream uses background context. // This allows static headers through. // // See https://github.com/open-telemetry/opentelemetry-collector/issues/6965 lk := strings.ToLower(k) if _, ok := h.tmpHdrs[lk]; !ok { h.tmpHdrs[lk] = v } } // Release the temporary copy used in emitFunc(). newHdrs := h.tmpHdrs h.tmpHdrs = nil // Note: newHdrs is passed to the Auth plugin. Whether // newHdrs is set in the context depends on h.includeMetadata. return h.newContext(ctx, newHdrs), newHdrs, nil } // tmpHdrsAppend appends to tmpHdrs, from decoder's emit function. func (h *headerReceiver) tmpHdrsAppend(hf hpack.HeaderField) { if h.tmpHdrs != nil { // We force strings.ToLower to ensure consistency. gRPC itself // does this and would do the same. hn := strings.ToLower(hf.Name) h.tmpHdrs[hn] = append(h.tmpHdrs[hn], hf.Value) } } func (h *headerReceiver) newContext(ctx context.Context, hdrs map[string][]string) context.Context { // Retain the Addr/Auth of the stream connection, update the // per-request metadata from the Arrow batch. var md client.Metadata if h.includeMetadata && hdrs != nil { md = client.NewMetadata(hdrs) } return client.NewContext(ctx, client.Info{ Addr: h.connInfo.Addr, Auth: h.connInfo.Auth, Metadata: md, }) } // logStreamError decides how to log an error. func (r *Receiver) logStreamError(err error, where string) (occode otelcodes.Code, msg string) { var code codes.Code // gRPC tends to supply status-wrapped errors, so we always // unpack them. A wrapped Canceled code indicates intentional // shutdown, which can be due to normal causes (EOF, e.g., // max-stream-lifetime reached) or unusual causes (Canceled, // e.g., because the other stream direction reached an error). if status, ok := status.FromError(err); ok { code = status.Code() msg = status.Message() } else if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) { code = codes.Canceled msg = err.Error() } else { code = codes.Internal msg = err.Error() } if code == codes.Canceled { occode = otelcodes.Unset r.telemetry.Logger.Debug("arrow stream shutdown", zap.String("message", msg), zap.String("where", where)) } else { occode = otelcodes.Error r.telemetry.Logger.Error("arrow stream error", zap.Int("code", int(code)), zap.String("message", msg), zap.String("where", where)) } return occode, msg } func gRPCName(desc grpc.ServiceDesc) string { return netstats.GRPCStreamMethodName(desc, desc.Streams[0]) } var ( arrowTracesMethod = gRPCName(arrowpb.ArrowTracesService_ServiceDesc) arrowMetricsMethod = gRPCName(arrowpb.ArrowMetricsService_ServiceDesc) arrowLogsMethod = gRPCName(arrowpb.ArrowLogsService_ServiceDesc) ) func (r *Receiver) ArrowTraces(serverStream arrowpb.ArrowTracesService_ArrowTracesServer) error { return r.anyStream(serverStream, arrowTracesMethod) } func (r *Receiver) ArrowLogs(serverStream arrowpb.ArrowLogsService_ArrowLogsServer) error { return r.anyStream(serverStream, arrowLogsMethod) } func (r *Receiver) ArrowMetrics(serverStream arrowpb.ArrowMetricsService_ArrowMetricsServer) error { return r.anyStream(serverStream, arrowMetricsMethod) } type anyStreamServer interface { Send(*arrowpb.BatchStatus) error Recv() (*arrowpb.BatchArrowRecords, error) grpc.ServerStream } type batchResp struct { id int64 err error } func (r *Receiver) recoverErr(retErr *error) { if err := recover(); err != nil { // When this happens, the stacktrace is // important and lost if we don't capture it // here. r.telemetry.Logger.Error("panic detail in otel-arrow-adapter", zap.Reflect("recovered", err), zap.Stack("stacktrace"), ) *retErr = status.Errorf(codes.Internal, "panic in otel-arrow-adapter: %v", err) } } func (r *Receiver) anyStream(serverStream anyStreamServer, method string) (retErr error) { streamCtx := serverStream.Context() ac := r.newConsumer() defer func() { if err := ac.Close(); err != nil { r.telemetry.Logger.Error("arrow stream close", zap.Error(err)) } }() defer r.recoverErr(&retErr) // doneCancel allows an error in the sender/receiver to // interrupt the corresponding thread. doneCtx, doneCancel := context.WithCancel(streamCtx) defer doneCancel() recvErrCh := make(chan error, 1) sendErrCh := make(chan error, 1) pendingCh := make(chan batchResp, runtime.NumCPU()) // wg is used to ensure this thread returns after both // sender and receiver threads return. var sendWG sync.WaitGroup var recvWG sync.WaitGroup sendWG.Add(1) recvWG.Add(1) // flushCtx controls the start of flushing. when this is canceled // after the receiver finishes, the flush operation begins. flushCtx, flushCancel := context.WithCancel(doneCtx) defer flushCancel() rstream := &receiverStream{ Receiver: r, } go func() { var err error defer recvWG.Done() defer r.recoverErr(&err) err = rstream.srvReceiveLoop(doneCtx, serverStream, pendingCh, method, ac) recvErrCh <- err }() go func() { var err error defer sendWG.Done() defer r.recoverErr(&err) // the sender receives flushCtx, which is canceled after the // receiver returns (success or no). err = rstream.srvSendLoop(flushCtx, serverStream, &recvWG, pendingCh) sendErrCh <- err }() // Wait for sender/receiver threads to return before returning. defer recvWG.Wait() defer sendWG.Wait() for { select { case <-doneCtx.Done(): return status.Error(codes.Canceled, "server stream shutdown") case err := <-recvErrCh: flushCancel() if errors.Is(err, io.EOF) { // the receiver returned EOF, next we // expect the sender to finish. continue } return err case err := <-sendErrCh: // explicit cancel here, in case the sender fails before // the receiver does. break the receiver loop here: doneCancel() return err } } } func (r *receiverStream) newInFlightData(ctx context.Context, method string, batchID int64, pendingCh chan<- batchResp) *inFlightData { _, span := r.tracer.Start(ctx, "otel_arrow_stream_inflight") r.inFlightWG.Add(1) id := &inFlightData{ receiverStream: r, method: method, batchID: batchID, pendingCh: pendingCh, span: span, } id.refs.Add(1) return id } // inFlightData is responsible for storing the resources held by one request. type inFlightData struct { // Receiver is the owner of the resources held by this object. *receiverStream method string batchID int64 pendingCh chan<- batchResp span trace.Span // refs counts the number of goroutines holding this object. // initially the recvOne() body, on success the // consumeAndRespond() function. refs atomic.Int32 numItems int // how many items uncompSize int64 // uncompressed data size == how many bytes held in the semaphore releaser admission2.ReleaseFunc } func (id *inFlightData) recvDone(ctx context.Context, recvErrPtr *error) { retErr := *recvErrPtr if retErr != nil { // logStreamError because this response will break the stream. occode, msg := id.logStreamError(retErr, "recv") id.span.SetStatus(occode, msg) } id.anyDone(ctx) } func (id *inFlightData) consumeDone(ctx context.Context, consumeErrPtr *error) { retErr := *consumeErrPtr if retErr != nil { // debug-level because the error was external from the pipeline. id.telemetry.Logger.Debug("otel-arrow consume", zap.Error(retErr)) id.span.SetStatus(otelcodes.Error, retErr.Error()) } id.replyToCaller(ctx, retErr) id.anyDone(ctx) } func (id *inFlightData) replyToCaller(ctx context.Context, callerErr error) { select { case id.pendingCh <- batchResp{ id: id.batchID, err: callerErr, }: // OK: Responded. case <-ctx.Done(): // OK: Never responded due to cancelation. } } func (id *inFlightData) anyDone(ctx context.Context) { // check if there are still refs, in which case leave the in-flight // counts where they are. if id.refs.Add(-1) != 0 { return } id.span.End() if id.releaser != nil { id.releaser() } // The netstats code knows that uncompressed size is // unreliable for arrow transport, so we instrument it // directly here. Only the primary direction of transport // is instrumented this way. var sized netstats.SizesStruct sized.Method = id.method sized.Length = id.uncompSize id.netReporter.CountReceive(ctx, sized) id.inFlightWG.Done() } // recvOne begins processing a single Arrow batch. // // If an error is encountered before Arrow data is successfully consumed, // the stream will break and the error will be returned immediately. // // If the error is due to authorization, the stream remains unbroken // and the request fails. // // If not enough resources are available, the stream will block (if // waiting permitted) or break (insufficient waiters). // // Assuming success, a new goroutine is created to handle consuming the // data. // // This handles constructing an inFlightData object, which itself // tracks everything that needs to be used by instrumentation when the // batch finishes. func (r *receiverStream) recvOne(streamCtx context.Context, serverStream anyStreamServer, hrcv *headerReceiver, pendingCh chan<- batchResp, method string, ac arrowRecord.ConsumerAPI) (retErr error) { // Receive a batch corresponding with one ptrace.Traces, pmetric.Metrics, // or plog.Logs item. req, recvErr := serverStream.Recv() // the incoming stream context is the parent of the in-flight context, which // carries a span covering sequential stream-processing work. the context // is severed at this point, with flight.span a contextless child that will be // finished in recvDone(). flight := r.newInFlightData(streamCtx, method, req.GetBatchId(), pendingCh) // inflightCtx is carried through into consumeAndProcess on the success path. // this inherits the stream context so that its auth headers are present // when the per-data Auth call is made. inflightCtx := streamCtx defer flight.recvDone(inflightCtx, &retErr) if recvErr != nil { if errors.Is(recvErr, io.EOF) { return recvErr } else if errors.Is(recvErr, context.Canceled) { // This is a special case to avoid introducing a span error // for a canceled operation. return io.EOF } else if status, ok := status.FromError(recvErr); ok && status.Code() == codes.Canceled { // This is a special case to avoid introducing a span error // for a canceled operation. return io.EOF } // Note: err is directly from gRPC, should already have status. return recvErr } // Check for optional headers and set the incoming context. inflightCtx, authHdrs, hdrErr := hrcv.combineHeaders(inflightCtx, req.GetHeaders()) if hdrErr != nil { // Failing to parse the incoming headers breaks the stream. return status.Errorf(codes.Internal, "arrow metadata error: %v", hdrErr) } // start this span after hrcv.combineHeaders returns extracted context. This will allow this span // to be a part of the data path trace instead of only being included as a child of the stream inflight trace. inflightCtx, span := r.tracer.Start(inflightCtx, "otel_arrow_stream_recv") defer span.End() // Authorize the request, if configured, prior to acquiring resources. if r.authServer != nil { var authErr error inflightCtx, authErr = r.authServer.Authenticate(inflightCtx, authHdrs) if authErr != nil { flight.replyToCaller(inflightCtx, status.Error(codes.Unauthenticated, authErr.Error())) return nil } } var callerCancel context.CancelFunc if encodedTimeout, has := authHdrs["grpc-timeout"]; has && len(encodedTimeout) == 1 { if timeout, decodeErr := grpcutil.DecodeTimeout(encodedTimeout[0]); decodeErr != nil { r.telemetry.Logger.Debug("grpc-timeout parse error", zap.Error(decodeErr)) } else { // timeout parsed successfully inflightCtx, callerCancel = context.WithTimeout(inflightCtx, timeout) // if we return before the new goroutine is started below // cancel the context. callerCancel will be non-nil until // the new goroutine is created at the end of this function. defer func() { if callerCancel != nil { callerCancel() } }() } } data, numItems, uncompSize, consumeErr := r.consumeBatch(ac, req) if consumeErr != nil { if errors.Is(consumeErr, arrowRecord.ErrConsumerMemoryLimit) { return status.Errorf(codes.ResourceExhausted, "otel-arrow decode: %v", consumeErr) } return status.Errorf(codes.Internal, "otel-arrow decode: %v", consumeErr) } // Use the bounded queue to memory limit based on incoming // uncompressed request size and waiters. Acquire will fail // immediately if there are too many waiters, or will // otherwise block until timeout or enough memory becomes // available. releaser, acquireErr := r.boundedQueue.Acquire(inflightCtx, uint64(uncompSize)) if acquireErr != nil { return acquireErr } flight.uncompSize = uncompSize flight.numItems = numItems flight.releaser = releaser // Recognize that the request is still in-flight via consumeAndRespond() flight.refs.Add(1) // consumeAndRespond consumes the data and returns control to the sender loop. go func(callerCancel context.CancelFunc) { if callerCancel != nil { defer callerCancel() } r.consumeAndRespond(inflightCtx, streamCtx, data, flight) }(callerCancel) // Reset callerCancel so the deferred function above does not call it here. callerCancel = nil return nil } // consumeAndRespond finishes the span started in recvOne and logs the // result after invoking the pipeline to consume the data. func (r *Receiver) consumeAndRespond(ctx, streamCtx context.Context, data any, flight *inFlightData) { var err error defer flight.consumeDone(streamCtx, &err) // recoverErr is a special function because it recovers panics, so we // keep it in a separate defer than the processing above, which will // run after the panic is recovered into an ordinary error. defer r.recoverErr(&err) err = r.consumeData(ctx, data, flight) } // srvReceiveLoop repeatedly receives one batch of data. func (r *receiverStream) srvReceiveLoop(ctx context.Context, serverStream anyStreamServer, pendingCh chan<- batchResp, method string, ac arrowRecord.ConsumerAPI) (retErr error) { hrcv := newHeaderReceiver(ctx, r.authServer, r.gsettings.IncludeMetadata) for { select { case <-ctx.Done(): return status.Error(codes.Canceled, "server stream shutdown") default: if err := r.recvOne(ctx, serverStream, hrcv, pendingCh, method, ac); err != nil { return err } } } } // srvReceiveLoop repeatedly sends one batch data response. func (r *receiverStream) sendOne(serverStream anyStreamServer, resp batchResp) error { // Note: Statuses can be batched, but we do not take // advantage of this feature. bs := &arrowpb.BatchStatus{ BatchId: resp.id, } if resp.err == nil { bs.StatusCode = arrowpb.StatusCode_OK } else { // Generally, code in the receiver should use // status.Errorf(codes.XXX, ...) so that we take the // first branch. if gsc, ok := status.FromError(resp.err); ok { bs.StatusCode = arrowpb.StatusCode(gsc.Code()) bs.StatusMessage = gsc.Message() } else { // Ideally, we don't take this branch because all code uses // gRPC status constructors and we've taken the branch above. // // This is a fallback for several broad categories of error. bs.StatusMessage = resp.err.Error() switch { case consumererror.IsPermanent(resp.err): // Some kind of pipeline error, somewhere downstream. r.telemetry.Logger.Error("arrow data error", zap.Error(resp.err)) bs.StatusCode = arrowpb.StatusCode_INVALID_ARGUMENT default: // Probably a pipeline error, retryable. r.telemetry.Logger.Debug("arrow consumer error", zap.Error(resp.err)) bs.StatusCode = arrowpb.StatusCode_UNAVAILABLE } } } if err := serverStream.Send(bs); err != nil { // logStreamError because this response will break the stream. _, _ = r.logStreamError(err, "send") return err } return nil } func (r *receiverStream) flushSender(serverStream anyStreamServer, recvWG *sync.WaitGroup, pendingCh <-chan batchResp) error { // wait to ensure no more items are accepted recvWG.Wait() // wait for all responses to be sent r.inFlightWG.Wait() for { select { case resp := <-pendingCh: if err := r.sendOne(serverStream, resp); err != nil { return err } default: // Currently nothing left in pendingCh. return nil } } } func (r *receiverStream) srvSendLoop(ctx context.Context, serverStream anyStreamServer, recvWG *sync.WaitGroup, pendingCh <-chan batchResp) error { for { select { case <-ctx.Done(): return r.flushSender(serverStream, recvWG, pendingCh) case resp := <-pendingCh: if err := r.sendOne(serverStream, resp); err != nil { return err } } } } // consumeBatch applies the batch to the Arrow Consumer, returns a // slice of pdata objects of the corresponding data type as `any`. // along with the number of items and true uncompressed size. func (r *Receiver) consumeBatch(arrowConsumer arrowRecord.ConsumerAPI, records *arrowpb.BatchArrowRecords) (retData any, numItems int, uncompSize int64, retErr error) { payloads := records.GetArrowPayloads() if len(payloads) == 0 { return nil, 0, 0, nil } switch payloads[0].Type { case arrowpb.ArrowPayloadType_UNIVARIATE_METRICS: if r.Metrics() == nil { return nil, 0, 0, status.Error(codes.Unimplemented, "metrics service not available") } var sizer pmetric.ProtoMarshaler data, err := arrowConsumer.MetricsFrom(records) if err == nil { for _, metrics := range data { numItems += metrics.DataPointCount() uncompSize += int64(sizer.MetricsSize(metrics)) } } retData = data retErr = err case arrowpb.ArrowPayloadType_LOGS: if r.Logs() == nil { return nil, 0, 0, status.Error(codes.Unimplemented, "logs service not available") } var sizer plog.ProtoMarshaler data, err := arrowConsumer.LogsFrom(records) if err == nil { for _, logs := range data { numItems += logs.LogRecordCount() uncompSize += int64(sizer.LogsSize(logs)) } } retData = data retErr = err case arrowpb.ArrowPayloadType_SPANS: if r.Traces() == nil { return nil, 0, 0, status.Error(codes.Unimplemented, "traces service not available") } var sizer ptrace.ProtoMarshaler data, err := arrowConsumer.TracesFrom(records) if err == nil { for _, traces := range data { numItems += traces.SpanCount() uncompSize += int64(sizer.TracesSize(traces)) } } retData = data retErr = err default: retErr = ErrUnrecognizedPayload } return retData, numItems, uncompSize, retErr } // consumeData invokes the next pipeline consumer for a received batch of data. // it uses the standard OTel collector instrumentation (receiverhelper.ObsReport). // // if any errors are permanent, returns a permanent error. func (r *Receiver) consumeData(ctx context.Context, data any, flight *inFlightData) (retErr error) { oneOp := func(err error) { retErr = multierr.Append(retErr, err) } var final func(context.Context, string, int, error) switch items := data.(type) { case []pmetric.Metrics: ctx = r.obsrecv.StartMetricsOp(ctx) for _, metrics := range items { oneOp(r.Metrics().ConsumeMetrics(ctx, metrics)) } final = r.obsrecv.EndMetricsOp case []plog.Logs: ctx = r.obsrecv.StartLogsOp(ctx) for _, logs := range items { oneOp(r.Logs().ConsumeLogs(ctx, logs)) } final = r.obsrecv.EndLogsOp case []ptrace.Traces: ctx = r.obsrecv.StartTracesOp(ctx) for _, traces := range items { oneOp(r.Traces().ConsumeTraces(ctx, traces)) } final = r.obsrecv.EndTracesOp default: retErr = ErrUnrecognizedPayload } if final != nil { final(ctx, streamFormat, flight.numItems, retErr) } return retErr }