in receiver/elasticapmreceiver/receiver.go [208:301]
func (r *elasticAPMReceiver) processBatch(ctx context.Context, batch *modelpb.Batch) error {
ld := plog.NewLogs()
md := pmetric.NewMetrics()
td := ptrace.NewTraces()
gk := modelprocessor.SetGroupingKey{
NewHash: func() hash.Hash {
return xxhash.New()
},
}
if err := gk.ProcessBatch(ctx, batch); err != nil {
r.settings.Logger.Error("failed to process batch", zap.Error(err))
}
for _, event := range *batch {
timestampNanos := event.GetTimestamp()
timestamp := time.Unix(
int64(timestampNanos/1e9), // Convert nanoseconds to seconds
int64(timestampNanos%1e9), // Remainder in nanoseconds
)
// TODO record metrics about events processed by type?
// TODO translate events to pdata types
switch event.Type() {
case modelpb.MetricEventType:
rm := md.ResourceMetrics().AppendEmpty()
sm := rm.ScopeMetrics().AppendEmpty()
metricset := event.GetMetricset()
// TODO interval, doc_count
// TODO how can we attach metricset.name?
for _, sample := range metricset.GetSamples() {
m := sm.Metrics().AppendEmpty()
m.SetName(sample.GetName())
m.SetUnit(sample.GetUnit())
// TODO set attributes (dimensions/labels)
switch sample.GetType() {
case modelpb.MetricType_METRIC_TYPE_COUNTER:
dp := m.SetEmptySum().DataPoints().AppendEmpty()
dp.SetTimestamp(pcommon.NewTimestampFromTime(timestamp))
dp.SetDoubleValue(sample.GetValue())
case modelpb.MetricType_METRIC_TYPE_GAUGE:
dp := m.SetEmptyGauge().DataPoints().AppendEmpty()
dp.SetTimestamp(pcommon.NewTimestampFromTime(timestamp))
dp.SetDoubleValue(sample.GetValue())
case modelpb.MetricType_METRIC_TYPE_HISTOGRAM:
// TODO histograms
case modelpb.MetricType_METRIC_TYPE_SUMMARY:
// TODO summaries
default:
return fmt.Errorf("unhandled metric type %q", sample.GetType())
}
}
case modelpb.ErrorEventType:
rl := ld.ResourceLogs().AppendEmpty()
r.elasticErrorToOtelLogRecord(&rl, event, timestamp, ctx)
case modelpb.LogEventType:
// TODO
case modelpb.SpanEventType, modelpb.TransactionEventType:
rs := td.ResourceSpans().AppendEmpty()
s := r.elasticEventToOtelSpan(&rs, event, timestamp)
isTransaction := event.Type() == modelpb.TransactionEventType
if isTransaction {
r.elasticTransactionToOtelSpan(&s, event)
} else {
r.elasticSpanToOTelSpan(&s, event)
}
default:
return fmt.Errorf("unhandled event type %q", event.Type())
}
}
var errs []error
if numRecords := ld.LogRecordCount(); numRecords != 0 {
ctx := r.obsreport.StartLogsOp(ctx)
err := r.nextLogs.ConsumeLogs(ctx, ld)
r.obsreport.EndLogsOp(ctx, dataFormatElasticAPM, numRecords, err)
errs = append(errs, err)
}
if numDataPoints := md.DataPointCount(); numDataPoints != 0 {
ctx := r.obsreport.StartMetricsOp(ctx)
err := r.nextMetrics.ConsumeMetrics(ctx, md)
r.obsreport.EndMetricsOp(ctx, dataFormatElasticAPM, numDataPoints, err)
errs = append(errs, err)
}
if numSpans := td.SpanCount(); numSpans != 0 {
ctx := r.obsreport.StartTracesOp(ctx)
err := r.nextTraces.ConsumeTraces(ctx, td)
r.obsreport.EndTracesOp(ctx, dataFormatElasticAPM, numSpans, err)
errs = append(errs, err)
}
return errors.Join(errs...)
}