in appender.go [479:611]
func (a *Appender) runActiveIndexer() {
var closed bool
var active *BulkIndexer
var timedFlush uint
var fullFlush uint
flushTimer := time.NewTimer(a.config.FlushInterval)
if !flushTimer.Stop() {
<-flushTimer.C
}
var firstDocTS time.Time
handleBulkItem := func(item BulkIndexerItem) bool {
if active == nil {
// NOTE(marclop) Record the TS when the first document is cached.
// It doesn't account for the time spent in the buffered channel.
firstDocTS = time.Now()
// Return early when the Close() context expires before get returns
// an indexer. This could happen when all available bulk_requests
// are in flight & no new BulkIndexers can be pulled from the pool.
var err error
active, err = a.pool.Get(a.errgroupContext, a.id)
if err != nil {
a.config.Logger.Warn("failed to get bulk indexer from pool", zap.Error(err))
return false
}
// The BulkIndexer may have been used by another appender, we need
// to reset it to ensure we're using the right client.
active.SetClient(a.client)
a.addUpDownCount(-1, &a.availableBulkRequests, a.metrics.availableBulkRequests)
a.addUpDownCount(1, nil, a.metrics.inflightBulkrequests)
flushTimer.Reset(a.config.FlushInterval)
}
if err := active.Add(item); err != nil {
a.config.Logger.Error("failed to Add item to bulk indexer", zap.Error(err))
}
return true
}
for !closed {
select {
case <-flushTimer.C:
timedFlush++
fullFlush = 0
default:
// When there's no active indexer and queue utilization is below 5%,
// reset the flushTimer with IdleInterval so excess active indexers
// that remain idle can be scaled down.
if !a.config.Scaling.Disabled && active == nil {
if a.scalingInformation().activeIndexers > 1 &&
float64(len(a.bulkItems))/float64(cap(a.bulkItems)) <= 0.05 {
flushTimer.Reset(a.config.Scaling.IdleInterval)
}
}
select {
case <-a.closed:
// Consume whatever bulk items have been buffered,
// and then flush a last time below.
for len(a.bulkItems) > 0 {
select {
case item := <-a.bulkItems:
handleBulkItem(item)
default:
// Another goroutine took the item.
}
}
closed = true
case <-flushTimer.C:
timedFlush++
fullFlush = 0
case item := <-a.bulkItems:
if handleBulkItem(item) {
if active.Len() < a.config.FlushBytes {
continue
}
fullFlush++
timedFlush = 0
// The active indexer is at or exceeds the configured FlushBytes
// threshold, so flush it.
if !flushTimer.Stop() {
<-flushTimer.C
}
}
}
}
if active != nil {
indexer := active
active = nil
attrs := metric.WithAttributeSet(a.config.MetricAttributes)
a.errgroup.Go(func() error {
var err error
took := timeFunc(func() {
err = a.flush(a.errgroupContext, indexer)
})
indexer.Reset()
a.pool.Put(a.id, indexer)
a.addUpDownCount(1, &a.availableBulkRequests, a.metrics.availableBulkRequests)
a.addUpDownCount(-1, nil, a.metrics.inflightBulkrequests, attrs)
a.metrics.flushDuration.Record(context.Background(), took.Seconds(),
attrs,
)
return err
})
a.metrics.bufferDuration.Record(context.Background(),
time.Since(firstDocTS).Seconds(), attrs,
)
}
if a.config.Scaling.Disabled {
continue
}
now := time.Now()
info := a.scalingInformation()
if a.maybeScaleDown(now, info, &timedFlush) {
a.addCount(1, &a.activeDestroyed, a.metrics.activeDestroyed)
return
}
if a.maybeScaleUp(now, info, &fullFlush) {
a.addCount(1, &a.activeCreated, a.metrics.activeCreated)
a.errgroup.Go(func() error {
a.runActiveIndexer()
return nil
})
}
}
// Decrement the active bulk requests when Appender is closed.
for {
info := a.scalingInformation()
if a.scalingInfo.CompareAndSwap(info, scalingInfo{
lastAction: time.Now(),
activeIndexers: info.activeIndexers - 1,
}) {
return
}
}
}