func()

in appender.go [479:611]


func (a *Appender) runActiveIndexer() {
	var closed bool
	var active *BulkIndexer
	var timedFlush uint
	var fullFlush uint
	flushTimer := time.NewTimer(a.config.FlushInterval)
	if !flushTimer.Stop() {
		<-flushTimer.C
	}
	var firstDocTS time.Time
	handleBulkItem := func(item BulkIndexerItem) bool {
		if active == nil {
			// NOTE(marclop) Record the TS when the first document is cached.
			// It doesn't account for the time spent in the buffered channel.
			firstDocTS = time.Now()
			// Return early when the Close() context expires before get returns
			// an indexer. This could happen when all available bulk_requests
			// are in flight & no new BulkIndexers can be pulled from the pool.
			var err error
			active, err = a.pool.Get(a.errgroupContext, a.id)
			if err != nil {
				a.config.Logger.Warn("failed to get bulk indexer from pool", zap.Error(err))
				return false
			}
			// The BulkIndexer may have been used by another appender, we need
			// to reset it to ensure we're using the right client.
			active.SetClient(a.client)

			a.addUpDownCount(-1, &a.availableBulkRequests, a.metrics.availableBulkRequests)
			a.addUpDownCount(1, nil, a.metrics.inflightBulkrequests)
			flushTimer.Reset(a.config.FlushInterval)
		}
		if err := active.Add(item); err != nil {
			a.config.Logger.Error("failed to Add item to bulk indexer", zap.Error(err))
		}
		return true
	}
	for !closed {
		select {
		case <-flushTimer.C:
			timedFlush++
			fullFlush = 0
		default:
			// When there's no active indexer and queue utilization is below 5%,
			// reset the flushTimer with IdleInterval so excess active indexers
			// that remain idle can be scaled down.
			if !a.config.Scaling.Disabled && active == nil {
				if a.scalingInformation().activeIndexers > 1 &&
					float64(len(a.bulkItems))/float64(cap(a.bulkItems)) <= 0.05 {
					flushTimer.Reset(a.config.Scaling.IdleInterval)
				}
			}
			select {
			case <-a.closed:
				// Consume whatever bulk items have been buffered,
				// and then flush a last time below.
				for len(a.bulkItems) > 0 {
					select {
					case item := <-a.bulkItems:
						handleBulkItem(item)
					default:
						// Another goroutine took the item.
					}
				}
				closed = true
			case <-flushTimer.C:
				timedFlush++
				fullFlush = 0
			case item := <-a.bulkItems:
				if handleBulkItem(item) {
					if active.Len() < a.config.FlushBytes {
						continue
					}
					fullFlush++
					timedFlush = 0
					// The active indexer is at or exceeds the configured FlushBytes
					// threshold, so flush it.
					if !flushTimer.Stop() {
						<-flushTimer.C
					}
				}
			}
		}
		if active != nil {
			indexer := active
			active = nil
			attrs := metric.WithAttributeSet(a.config.MetricAttributes)
			a.errgroup.Go(func() error {
				var err error
				took := timeFunc(func() {
					err = a.flush(a.errgroupContext, indexer)
				})
				indexer.Reset()
				a.pool.Put(a.id, indexer)
				a.addUpDownCount(1, &a.availableBulkRequests, a.metrics.availableBulkRequests)
				a.addUpDownCount(-1, nil, a.metrics.inflightBulkrequests, attrs)
				a.metrics.flushDuration.Record(context.Background(), took.Seconds(),
					attrs,
				)
				return err
			})
			a.metrics.bufferDuration.Record(context.Background(),
				time.Since(firstDocTS).Seconds(), attrs,
			)
		}
		if a.config.Scaling.Disabled {
			continue
		}
		now := time.Now()
		info := a.scalingInformation()
		if a.maybeScaleDown(now, info, &timedFlush) {
			a.addCount(1, &a.activeDestroyed, a.metrics.activeDestroyed)
			return
		}
		if a.maybeScaleUp(now, info, &fullFlush) {
			a.addCount(1, &a.activeCreated, a.metrics.activeCreated)
			a.errgroup.Go(func() error {
				a.runActiveIndexer()
				return nil
			})
		}
	}
	// Decrement the active bulk requests when Appender is closed.
	for {
		info := a.scalingInformation()
		if a.scalingInfo.CompareAndSwap(info, scalingInfo{
			lastAction:     time.Now(),
			activeIndexers: info.activeIndexers - 1,
		}) {
			return
		}
	}
}