in internal/pkg/bulk/engine.go [345:456]
func (b *Bulker) Run(ctx context.Context) error {
var err error
zerolog.Ctx(ctx).Info().Interface("opts", &b.opts).Msg("Run bulker with options")
// Create timer in stopped state
timer := time.NewTimer(b.opts.flushInterval)
stopTimer(timer)
defer timer.Stop()
w := semaphore.NewWeighted(int64(b.opts.maxPending))
var queues [kNumQueues]queueT
var i queueType
for ; i < kNumQueues; i++ {
queues[i].ty = i
}
var itemCnt int
var byteCnt int
doFlush := func() error {
for i := range queues {
q := &queues[i]
if q.pending > 0 {
// Pass queue structure by value
if err := b.flushQueue(ctx, w, *q); err != nil {
return err
}
// Reset local queue stored in array
q.cnt = 0
q.head = nil
q.pending = 0
}
}
// Reset threshold counters
itemCnt = 0
byteCnt = 0
return nil
}
for err == nil {
select {
case blk := <-b.ch:
queueIdx := blkToQueueType(blk)
q := &queues[queueIdx]
// Prepend block to head of target queue
blk.next = q.head
q.head = blk
// Update pending count on target queue
q.cnt += 1
q.pending += blk.buf.Len()
// Update threshold counters
itemCnt += 1
byteCnt += blk.buf.Len()
// Start timer on first queued item
if itemCnt == 1 {
timer.Reset(b.opts.flushInterval)
}
// Threshold test, short circuit timer on pending count
if itemCnt >= b.opts.flushThresholdCnt || byteCnt >= b.opts.flushThresholdSz {
zerolog.Ctx(ctx).Trace().
Str("mod", kModBulk).
Int("itemCnt", itemCnt).
Int("byteCnt", byteCnt).
Msg("Flush on threshold")
err = doFlush()
stopTimer(timer)
}
case <-timer.C:
zerolog.Ctx(ctx).Trace().
Str("mod", kModBulk).
Int("itemCnt", itemCnt).
Int("byteCnt", byteCnt).
Msg("Flush on timer")
err = doFlush()
case <-ctx.Done():
err = ctx.Err()
}
}
// cancelling context of each remote bulker when Run exits
defer func() {
b.remoteOutputMutex.RLock()
defer b.remoteOutputMutex.RUnlock()
for _, bulker := range b.bulkerMap {
bulker.CancelFn()()
}
}()
return err
}