in exporter/elasticsearchexporter/exporter.go [185:346]
func (e *elasticsearchExporter) pushMetricsData(
ctx context.Context,
metrics pmetric.Metrics,
) error {
mappingMode, err := e.getMappingMode(ctx)
if err != nil {
return err
}
router := newDocumentRouter(mappingMode, e.index, e.config)
hasher := newDataPointHasher(mappingMode)
encoder, err := e.getEncoder(mappingMode)
if err != nil {
return err
}
session, err := e.bulkIndexers.modes[mappingMode].StartSession(ctx)
if err != nil {
return err
}
defer session.End()
groupedDataPointsByIndex := make(map[elasticsearch.Index]map[uint32]*dataPointsGroup)
var validationErrs []error // log instead of returning these so that upstream does not retry
var errs []error
resourceMetrics := metrics.ResourceMetrics()
for i := 0; i < resourceMetrics.Len(); i++ {
resourceMetric := resourceMetrics.At(i)
resource := resourceMetric.Resource()
scopeMetrics := resourceMetric.ScopeMetrics()
for j := 0; j < scopeMetrics.Len(); j++ {
scopeMetrics := scopeMetrics.At(j)
scope := scopeMetrics.Scope()
for k := 0; k < scopeMetrics.Metrics().Len(); k++ {
metric := scopeMetrics.Metrics().At(k)
upsertDataPoint := func(dp datapoints.DataPoint) error {
index, err := router.routeDataPoint(resource, scope, dp.Attributes())
if err != nil {
return err
}
groupedDataPoints, ok := groupedDataPointsByIndex[index]
if !ok {
groupedDataPoints = make(map[uint32]*dataPointsGroup)
groupedDataPointsByIndex[index] = groupedDataPoints
}
dpHash := hasher.hashDataPoint(resource, scope, dp)
dpGroup, ok := groupedDataPoints[dpHash]
if !ok {
groupedDataPoints[dpHash] = &dataPointsGroup{
resource: resource,
resourceSchemaURL: resourceMetric.SchemaUrl(),
scope: scope,
scopeSchemaURL: scopeMetrics.SchemaUrl(),
dataPoints: []datapoints.DataPoint{dp},
}
} else {
dpGroup.addDataPoint(dp)
}
return nil
}
switch metric.Type() {
case pmetric.MetricTypeSum:
dps := metric.Sum().DataPoints()
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
if err := upsertDataPoint(datapoints.NewNumber(metric, dp)); err != nil {
validationErrs = append(validationErrs, err)
continue
}
}
case pmetric.MetricTypeGauge:
dps := metric.Gauge().DataPoints()
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
if err := upsertDataPoint(datapoints.NewNumber(metric, dp)); err != nil {
validationErrs = append(validationErrs, err)
continue
}
}
case pmetric.MetricTypeExponentialHistogram:
if metric.ExponentialHistogram().AggregationTemporality() == pmetric.AggregationTemporalityCumulative {
validationErrs = append(validationErrs, fmt.Errorf("dropping cumulative temporality exponential histogram %q", metric.Name()))
continue
}
dps := metric.ExponentialHistogram().DataPoints()
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
if err := upsertDataPoint(datapoints.NewExponentialHistogram(metric, dp)); err != nil {
validationErrs = append(validationErrs, err)
continue
}
}
case pmetric.MetricTypeHistogram:
if metric.Histogram().AggregationTemporality() == pmetric.AggregationTemporalityCumulative {
validationErrs = append(validationErrs, fmt.Errorf("dropping cumulative temporality histogram %q", metric.Name()))
continue
}
dps := metric.Histogram().DataPoints()
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
if err := upsertDataPoint(datapoints.NewHistogram(metric, dp)); err != nil {
validationErrs = append(validationErrs, err)
continue
}
}
case pmetric.MetricTypeSummary:
dps := metric.Summary().DataPoints()
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
if err := upsertDataPoint(datapoints.NewSummary(metric, dp)); err != nil {
validationErrs = append(validationErrs, err)
continue
}
}
}
}
}
}
for index, groupedDataPoints := range groupedDataPointsByIndex {
for _, dpGroup := range groupedDataPoints {
buf := e.bufferPool.NewPooledBuffer()
dynamicTemplates, err := encoder.encodeMetrics(
encodingContext{
resource: dpGroup.resource,
resourceSchemaURL: dpGroup.resourceSchemaURL,
scope: dpGroup.scope,
scopeSchemaURL: dpGroup.scopeSchemaURL,
},
dpGroup.dataPoints,
&validationErrs,
index,
buf.Buffer,
)
if err != nil {
buf.Recycle()
errs = append(errs, err)
continue
}
if err := session.Add(ctx, index.Index, "", "", buf, dynamicTemplates, docappender.ActionCreate); err != nil {
// not recycling after Add returns an error as we don't know if it's already recycled
if cerr := ctx.Err(); cerr != nil {
return cerr
}
errs = append(errs, err)
}
}
}
if len(validationErrs) > 0 {
e.set.Logger.Warn("validation errors", zap.Error(errors.Join(validationErrs...)))
}
if err := session.Flush(ctx); err != nil {
if cerr := ctx.Err(); cerr != nil {
return cerr
}
errs = append(errs, err)
}
return errors.Join(errs...)
}