in metricsgenreceiver/receiver.go [102:147]
func (r *MetricsGenReceiver) Start(ctx context.Context, host component.Host) error {
ctx = context.Background()
ctx, r.cancel = context.WithCancel(ctx)
go func() {
start := time.Now()
nextLog := start.Add(10 * time.Second)
ticker := time.NewTicker(r.cfg.Interval)
defer ticker.Stop()
dataPoints := uint64(0)
currentTime := r.cfg.StartTime
for i := 0; currentTime.UnixNano() < r.cfg.EndTime.UnixNano(); i++ {
if ctx.Err() != nil {
return
}
if time.Now().After(nextLog) {
progressPct := currentTime.Sub(r.cfg.StartTime).Seconds() / r.cfg.EndTime.Sub(r.cfg.StartTime).Seconds()
duration := time.Now().Sub(start)
r.settings.Logger.Info("generating metrics progress",
zap.Int("progress_percent", int(progressPct*100)),
zap.String("eta", (time.Duration(float64(duration.Nanoseconds())/progressPct)-duration).Round(time.Second).String()),
zap.Uint64("datapoints", dataPoints),
zap.Float64("data_points_per_second", float64(dataPoints)/time.Now().Sub(start).Seconds()))
nextLog = nextLog.Add(10 * time.Second)
}
simulatedTime := addJitter(currentTime, r.cfg.IntervalJitterStdDev, r.cfg.Interval)
dataPoints += r.produceMetrics(ctx, simulatedTime)
r.applyChurn(i, simulatedTime)
if r.cfg.RealTime {
<-ticker.C
}
currentTime = currentTime.Add(r.cfg.Interval)
}
duration := time.Now().Sub(start)
r.settings.Logger.Info("finished generating metrics",
zap.Uint64("datapoints", dataPoints),
zap.String("duration", duration.Round(time.Millisecond).String()),
zap.Float64("data_points_per_second", float64(dataPoints)/duration.Seconds()))
if r.cfg.ExitAfterEnd {
componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(errors.New("exiting because exit_after_end is set to true")))
}
}()
return nil
}