func()

in receiver/elasticapmreceiver/receiver.go [208:301]


func (r *elasticAPMReceiver) processBatch(ctx context.Context, batch *modelpb.Batch) error {
	ld := plog.NewLogs()
	md := pmetric.NewMetrics()
	td := ptrace.NewTraces()

	gk := modelprocessor.SetGroupingKey{
		NewHash: func() hash.Hash {
			return xxhash.New()
		},
	}

	if err := gk.ProcessBatch(ctx, batch); err != nil {
		r.settings.Logger.Error("failed to process batch", zap.Error(err))
	}

	for _, event := range *batch {
		timestampNanos := event.GetTimestamp()
		timestamp := time.Unix(
			int64(timestampNanos/1e9), // Convert nanoseconds to seconds
			int64(timestampNanos%1e9), // Remainder in nanoseconds
		)

		// TODO record metrics about events processed by type?
		// TODO translate events to pdata types
		switch event.Type() {
		case modelpb.MetricEventType:
			rm := md.ResourceMetrics().AppendEmpty()
			sm := rm.ScopeMetrics().AppendEmpty()
			metricset := event.GetMetricset()

			// TODO interval, doc_count
			// TODO how can we attach metricset.name?
			for _, sample := range metricset.GetSamples() {
				m := sm.Metrics().AppendEmpty()
				m.SetName(sample.GetName())
				m.SetUnit(sample.GetUnit())
				// TODO set attributes (dimensions/labels)
				switch sample.GetType() {
				case modelpb.MetricType_METRIC_TYPE_COUNTER:
					dp := m.SetEmptySum().DataPoints().AppendEmpty()
					dp.SetTimestamp(pcommon.NewTimestampFromTime(timestamp))
					dp.SetDoubleValue(sample.GetValue())
				case modelpb.MetricType_METRIC_TYPE_GAUGE:
					dp := m.SetEmptyGauge().DataPoints().AppendEmpty()
					dp.SetTimestamp(pcommon.NewTimestampFromTime(timestamp))
					dp.SetDoubleValue(sample.GetValue())
				case modelpb.MetricType_METRIC_TYPE_HISTOGRAM:
					// TODO histograms
				case modelpb.MetricType_METRIC_TYPE_SUMMARY:
					// TODO summaries
				default:
					return fmt.Errorf("unhandled metric type %q", sample.GetType())
				}
			}
		case modelpb.ErrorEventType:
			rl := ld.ResourceLogs().AppendEmpty()
			r.elasticErrorToOtelLogRecord(&rl, event, timestamp, ctx)
		case modelpb.LogEventType:
			// TODO
		case modelpb.SpanEventType, modelpb.TransactionEventType:
			rs := td.ResourceSpans().AppendEmpty()
			s := r.elasticEventToOtelSpan(&rs, event, timestamp)

			isTransaction := event.Type() == modelpb.TransactionEventType
			if isTransaction {
				r.elasticTransactionToOtelSpan(&s, event)
			} else {
				r.elasticSpanToOTelSpan(&s, event)
			}
		default:
			return fmt.Errorf("unhandled event type %q", event.Type())
		}
	}
	var errs []error
	if numRecords := ld.LogRecordCount(); numRecords != 0 {
		ctx := r.obsreport.StartLogsOp(ctx)
		err := r.nextLogs.ConsumeLogs(ctx, ld)
		r.obsreport.EndLogsOp(ctx, dataFormatElasticAPM, numRecords, err)
		errs = append(errs, err)
	}
	if numDataPoints := md.DataPointCount(); numDataPoints != 0 {
		ctx := r.obsreport.StartMetricsOp(ctx)
		err := r.nextMetrics.ConsumeMetrics(ctx, md)
		r.obsreport.EndMetricsOp(ctx, dataFormatElasticAPM, numDataPoints, err)
		errs = append(errs, err)
	}
	if numSpans := td.SpanCount(); numSpans != 0 {
		ctx := r.obsreport.StartTracesOp(ctx)
		err := r.nextTraces.ConsumeTraces(ctx, td)
		r.obsreport.EndTracesOp(ctx, dataFormatElasticAPM, numSpans, err)
		errs = append(errs, err)
	}
	return errors.Join(errs...)
}