func()

in pkg/export/export.go [720:806]


func (e *Exporter) Run() error {
	// Note: We don't expect the NopExporter to call this. Only the main binary calls this.
	defer e.close()
	go e.seriesCache.run(e.ctx)
	go e.lease.Run(e.ctx)

	timer := time.NewTimer(batchDelayMax)
	stopTimer := func() {
		if !timer.Stop() {
			select {
			case <-timer.C:
			default:
			}
		}
	}
	defer stopTimer()

	e.mtx.RLock()
	opts := e.opts
	e.mtx.RUnlock()

	curBatch := newBatch(e.logger, opts.Efficiency.ShardCount, opts.Efficiency.BatchSize)

	// Send the currently accumulated batch to GCM asynchronously.
	send := func() {
		e.mtx.RLock()
		opts := e.opts
		sendFunc := e.metricClient.CreateTimeSeries
		e.mtx.RUnlock()

		// Send the batch and once it completed, trigger next to process remaining data in the
		// shards that were part of the batch. This ensures that if we didn't take all samples
		// from a shard when filling the batch, we'll come back for them and any queue built-up
		// gets sent eventually.
		go func(ctx context.Context, b *batch) {
			if !opts.Disable {
				b.send(ctx, sendFunc)
			}
			// We could only trigger if we didn't fully empty shards in this batch.
			// Benchmarking showed no beneficial impact of this optimization.
			e.triggerNext()
		}(e.ctx, curBatch)

		// Reset state for new batch.
		stopTimer()
		timer.Reset(batchDelayMax)

		curBatch = newBatch(e.logger, opts.Efficiency.ShardCount, opts.Efficiency.BatchSize)
	}

	for {
		select {
		// NOTE(freinartz): we will terminate once context is cancelled and not flush remaining
		// buffered data. In-flight requests will be aborted as well.
		// This is fine once we persist data submitted via Export() but for now there may be some
		// data loss on shutdown.
		case <-e.ctx.Done():
			return nil
		// This is activated for each new sample that arrives
		case <-e.nextc:
			sendIterations.Inc()

			// Drain shards to fill up the batch.
			//
			// If the shard count is high given the overall throughput, a lot of shards may
			// be packed into the same batch. A slow request will then block all those shards
			// from further parallel sends.
			// If this becomes a problem (especially when we grow maximum batch size), consider
			// adding a heuristic to send partial batches in favor of limiting the number of
			// shards they span.
			for _, shard := range e.shards {
				shard.fill(curBatch)
				if curBatch.full() {
					send()
				}
			}

		case <-timer.C:
			// Flush batch that has been pending for too long.
			if !curBatch.empty() {
				send()
			} else {
				timer.Reset(batchDelayMax)
			}
		}
	}
}