in producer/mover.go [34:75]
func (mover *Mover) run(moverWaitGroup *sync.WaitGroup, config *ProducerConfig) {
defer moverWaitGroup.Done()
defer mover.sendRemaining()
for !mover.moverShutDownFlag.Load() {
sleepMs := config.LingerMs
nowTimeMs := time.Now().UnixMilli()
toSendBatches := make([]*ProducerBatch, 0)
mover.logAccumulator.lock.Lock()
for key, batch := range mover.logAccumulator.logGroupData {
if batch == nil {
continue
}
timeInterval := batch.createTimeMs + config.LingerMs - nowTimeMs
if timeInterval <= 0 {
toSendBatches = append(toSendBatches, batch)
mover.logAccumulator.logGroupData[key] = nil
} else if sleepMs > timeInterval {
sleepMs = timeInterval
}
}
mover.logAccumulator.lock.Unlock()
for _, batch := range toSendBatches {
mover.threadPool.addTask(batch)
}
retryBatches := mover.retryQueue.getRetryBatch(mover.moverShutDownFlag.Load())
if len(retryBatches) > 0 {
for _, batch := range retryBatches {
mover.threadPool.addTask(batch)
}
time.Sleep(time.Millisecond)
} else {
time.Sleep(time.Duration(sleepMs) * time.Millisecond)
}
mover.clearEmptyKeys()
}
}