func()

in agent/agent.go [244:323]


func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) error {
	// Inelegant, but this sleep is to allow the Gather threads to run, so that
	// the flusher will flush after metrics are collected.
	time.Sleep(time.Millisecond * 300)

	// create an output metric channel and a gorouting that continously passes
	// each metric onto the output plugins & aggregators.
	outMetricC := make(chan telegraf.Metric, 100)
	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		defer wg.Done()
		for {
			select {
			case <-shutdown:
				if len(outMetricC) > 0 {
					// keep going until outMetricC is flushed
					continue
				}
				return
			case m := <-outMetricC:
				// if dropOriginal is set to true, then we will only send this
				// metric to the aggregators, not the outputs.
				var dropOriginal bool
				if !m.IsAggregate() {
					for _, agg := range a.Config.Aggregators {
						if ok := agg.Add(m.Copy()); ok {
							dropOriginal = true
						}
					}
				}
				if !dropOriginal {
					for i, o := range a.Config.Outputs {
						if i == len(a.Config.Outputs)-1 {
							o.AddMetric(m)
						} else {
							o.AddMetric(m.Copy())
						}
					}
				}
			}
		}
	}()

	ticker := time.NewTicker(a.Config.Agent.FlushInterval.Duration)
	semaphore := make(chan struct{}, 1)
	for {
		select {
		case <-shutdown:
			log.Println("I! Hang on, flushing any cached metrics before shutdown")
			// wait for outMetricC to get flushed before flushing outputs
			wg.Wait()
			a.flush()
			return nil
		case <-ticker.C:
			go func() {
				select {
				case semaphore <- struct{}{}:
					internal.RandomSleep(a.Config.Agent.FlushJitter.Duration, shutdown)
					a.flush()
					<-semaphore
				default:
					// skipping this flush because one is already happening
					log.Println("W! Skipping a scheduled flush because there is" +
						" already a flush ongoing.")
				}
			}()
		case metric := <-metricC:
			// NOTE potential bottleneck here as we put each metric through the
			// processors serially.
			mS := []telegraf.Metric{metric}
			for _, processor := range a.Config.Processors {
				mS = processor.Apply(mS...)
			}
			for _, m := range mS {
				outMetricC <- m
			}
		}
	}
}