func()

in producer/mover.go [34:75]


func (mover *Mover) run(moverWaitGroup *sync.WaitGroup, config *ProducerConfig) {
	defer moverWaitGroup.Done()
	defer mover.sendRemaining()

	for !mover.moverShutDownFlag.Load() {
		sleepMs := config.LingerMs
		nowTimeMs := time.Now().UnixMilli()
		toSendBatches := make([]*ProducerBatch, 0)

		mover.logAccumulator.lock.Lock()
		for key, batch := range mover.logAccumulator.logGroupData {
			if batch == nil {
				continue
			}
			timeInterval := batch.createTimeMs + config.LingerMs - nowTimeMs
			if timeInterval <= 0 {
				toSendBatches = append(toSendBatches, batch)
				mover.logAccumulator.logGroupData[key] = nil
			} else if sleepMs > timeInterval {
				sleepMs = timeInterval
			}
		}
		mover.logAccumulator.lock.Unlock()

		for _, batch := range toSendBatches {
			mover.threadPool.addTask(batch)
		}

		retryBatches := mover.retryQueue.getRetryBatch(mover.moverShutDownFlag.Load())
		if len(retryBatches) > 0 {
			for _, batch := range retryBatches {
				mover.threadPool.addTask(batch)
			}
			time.Sleep(time.Millisecond)
		} else {
			time.Sleep(time.Duration(sleepMs) * time.Millisecond)
		}

		mover.clearEmptyKeys()
	}

}