in receiver/loadgenreceiver/metrics.go [107:157]
func (ar *metricsGenerator) Start(ctx context.Context, _ component.Host) error {
startCtx, cancelFn := context.WithCancel(ctx)
ar.cancelFn = cancelFn
for i := 0; i < ar.cfg.Concurrency; i++ {
ar.inflightConcurrency.Add(1)
go func() {
defer ar.inflightConcurrency.Done()
next := pmetric.NewMetrics() // per-worker temporary container to avoid allocs
for {
select {
case <-startCtx.Done():
return
default:
}
if next.IsReadOnly() {
// As the optimization to reuse pdata is not compatible with fanoutconsumer,
// i.e. in pipelines where there are more than 1 consumer,
// as fanoutconsumer will mark the pdata struct as read only and cannot be reused.
// See https://github.com/open-telemetry/opentelemetry-collector/blob/461a3558086a03ab13ea121d12e28e185a1c79b0/internal/fanoutconsumer/logs.go#L70
next = pmetric.NewMetrics()
}
err := ar.nextMetrics(next)
if errors.Is(err, list.ErrLoopLimitReached) {
return
}
// For graceful shutdown, use ctx instead of startCtx to shield Consume* from context canceled
// In other words, Consume* will finish at its own pace, which may take indefinitely long.
if err := ar.consumer.ConsumeMetrics(ctx, next); err != nil {
ar.logger.Error(err.Error())
ar.statsMu.Lock()
ar.stats.FailedRequests++
ar.stats.FailedMetricDataPoints += next.DataPointCount()
ar.statsMu.Unlock()
} else {
ar.statsMu.Lock()
ar.stats.Requests++
ar.stats.MetricDataPoints += next.DataPointCount()
ar.statsMu.Unlock()
}
}
}()
}
go func() {
ar.inflightConcurrency.Wait()
if ar.cfg.Metrics.doneCh != nil {
ar.cfg.Metrics.doneCh <- ar.stats
}
}()
return nil
}