in pkg/export/export.go [720:806]
func (e *Exporter) Run() error {
// Note: We don't expect the NopExporter to call this. Only the main binary calls this.
defer e.close()
go e.seriesCache.run(e.ctx)
go e.lease.Run(e.ctx)
timer := time.NewTimer(batchDelayMax)
stopTimer := func() {
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
}
defer stopTimer()
e.mtx.RLock()
opts := e.opts
e.mtx.RUnlock()
curBatch := newBatch(e.logger, opts.Efficiency.ShardCount, opts.Efficiency.BatchSize)
// Send the currently accumulated batch to GCM asynchronously.
send := func() {
e.mtx.RLock()
opts := e.opts
sendFunc := e.metricClient.CreateTimeSeries
e.mtx.RUnlock()
// Send the batch and once it completed, trigger next to process remaining data in the
// shards that were part of the batch. This ensures that if we didn't take all samples
// from a shard when filling the batch, we'll come back for them and any queue built-up
// gets sent eventually.
go func(ctx context.Context, b *batch) {
if !opts.Disable {
b.send(ctx, sendFunc)
}
// We could only trigger if we didn't fully empty shards in this batch.
// Benchmarking showed no beneficial impact of this optimization.
e.triggerNext()
}(e.ctx, curBatch)
// Reset state for new batch.
stopTimer()
timer.Reset(batchDelayMax)
curBatch = newBatch(e.logger, opts.Efficiency.ShardCount, opts.Efficiency.BatchSize)
}
for {
select {
// NOTE(freinartz): we will terminate once context is cancelled and not flush remaining
// buffered data. In-flight requests will be aborted as well.
// This is fine once we persist data submitted via Export() but for now there may be some
// data loss on shutdown.
case <-e.ctx.Done():
return nil
// This is activated for each new sample that arrives
case <-e.nextc:
sendIterations.Inc()
// Drain shards to fill up the batch.
//
// If the shard count is high given the overall throughput, a lot of shards may
// be packed into the same batch. A slow request will then block all those shards
// from further parallel sends.
// If this becomes a problem (especially when we grow maximum batch size), consider
// adding a heuristic to send partial batches in favor of limiting the number of
// shards they span.
for _, shard := range e.shards {
shard.fill(curBatch)
if curBatch.full() {
send()
}
}
case <-timer.C:
// Flush batch that has been pending for too long.
if !curBatch.empty() {
send()
} else {
timer.Reset(batchDelayMax)
}
}
}
}