func()

in metricsgenreceiver/receiver.go [102:147]


func (r *MetricsGenReceiver) Start(ctx context.Context, host component.Host) error {
	ctx = context.Background()
	ctx, r.cancel = context.WithCancel(ctx)
	go func() {
		start := time.Now()
		nextLog := start.Add(10 * time.Second)
		ticker := time.NewTicker(r.cfg.Interval)
		defer ticker.Stop()
		dataPoints := uint64(0)
		currentTime := r.cfg.StartTime
		for i := 0; currentTime.UnixNano() < r.cfg.EndTime.UnixNano(); i++ {
			if ctx.Err() != nil {
				return
			}
			if time.Now().After(nextLog) {
				progressPct := currentTime.Sub(r.cfg.StartTime).Seconds() / r.cfg.EndTime.Sub(r.cfg.StartTime).Seconds()
				duration := time.Now().Sub(start)
				r.settings.Logger.Info("generating metrics progress",
					zap.Int("progress_percent", int(progressPct*100)),
					zap.String("eta", (time.Duration(float64(duration.Nanoseconds())/progressPct)-duration).Round(time.Second).String()),
					zap.Uint64("datapoints", dataPoints),
					zap.Float64("data_points_per_second", float64(dataPoints)/time.Now().Sub(start).Seconds()))
				nextLog = nextLog.Add(10 * time.Second)
			}
			simulatedTime := addJitter(currentTime, r.cfg.IntervalJitterStdDev, r.cfg.Interval)
			dataPoints += r.produceMetrics(ctx, simulatedTime)
			r.applyChurn(i, simulatedTime)

			if r.cfg.RealTime {
				<-ticker.C
			}
			currentTime = currentTime.Add(r.cfg.Interval)
		}
		duration := time.Now().Sub(start)

		r.settings.Logger.Info("finished generating metrics",
			zap.Uint64("datapoints", dataPoints),
			zap.String("duration", duration.Round(time.Millisecond).String()),
			zap.Float64("data_points_per_second", float64(dataPoints)/duration.Seconds()))
		if r.cfg.ExitAfterEnd {
			componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(errors.New("exiting because exit_after_end is set to true")))
		}
	}()

	return nil
}