func()

in internal/pkg/bulk/engine.go [345:456]


func (b *Bulker) Run(ctx context.Context) error {
	var err error

	zerolog.Ctx(ctx).Info().Interface("opts", &b.opts).Msg("Run bulker with options")

	// Create timer in stopped state
	timer := time.NewTimer(b.opts.flushInterval)
	stopTimer(timer)
	defer timer.Stop()

	w := semaphore.NewWeighted(int64(b.opts.maxPending))

	var queues [kNumQueues]queueT

	var i queueType
	for ; i < kNumQueues; i++ {
		queues[i].ty = i
	}

	var itemCnt int
	var byteCnt int

	doFlush := func() error {

		for i := range queues {
			q := &queues[i]
			if q.pending > 0 {

				// Pass queue structure by value
				if err := b.flushQueue(ctx, w, *q); err != nil {
					return err
				}

				// Reset local queue stored in array
				q.cnt = 0
				q.head = nil
				q.pending = 0
			}
		}

		// Reset threshold counters
		itemCnt = 0
		byteCnt = 0

		return nil
	}

	for err == nil {

		select {

		case blk := <-b.ch:

			queueIdx := blkToQueueType(blk)
			q := &queues[queueIdx]

			// Prepend block to head of target queue
			blk.next = q.head
			q.head = blk

			// Update pending count on target queue
			q.cnt += 1
			q.pending += blk.buf.Len()

			// Update threshold counters
			itemCnt += 1
			byteCnt += blk.buf.Len()

			// Start timer on first queued item
			if itemCnt == 1 {
				timer.Reset(b.opts.flushInterval)
			}

			// Threshold test, short circuit timer on pending count
			if itemCnt >= b.opts.flushThresholdCnt || byteCnt >= b.opts.flushThresholdSz {
				zerolog.Ctx(ctx).Trace().
					Str("mod", kModBulk).
					Int("itemCnt", itemCnt).
					Int("byteCnt", byteCnt).
					Msg("Flush on threshold")

				err = doFlush()

				stopTimer(timer)
			}

		case <-timer.C:
			zerolog.Ctx(ctx).Trace().
				Str("mod", kModBulk).
				Int("itemCnt", itemCnt).
				Int("byteCnt", byteCnt).
				Msg("Flush on timer")

			err = doFlush()

		case <-ctx.Done():
			err = ctx.Err()
		}

	}

	// cancelling context of each remote bulker when Run exits
	defer func() {
		b.remoteOutputMutex.RLock()
		defer b.remoteOutputMutex.RUnlock()
		for _, bulker := range b.bulkerMap {
			bulker.CancelFn()()
		}
	}()

	return err
}