producer/mover.go (89 lines of code) (raw):
package producer
import (
"sync"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"go.uber.org/atomic"
)
type Mover struct {
moverShutDownFlag *atomic.Bool
retryQueue *RetryQueue
ioWorker *IoWorker
logAccumulator *LogAccumulator
logger log.Logger
threadPool *IoThreadPool
}
func initMover(logAccumulator *LogAccumulator, retryQueue *RetryQueue, ioWorker *IoWorker, logger log.Logger, threadPool *IoThreadPool) *Mover {
mover := &Mover{
moverShutDownFlag: atomic.NewBool(false),
retryQueue: retryQueue,
ioWorker: ioWorker,
logAccumulator: logAccumulator,
logger: logger,
threadPool: threadPool,
}
return mover
}
func (mover *Mover) run(moverWaitGroup *sync.WaitGroup, config *ProducerConfig) {
defer moverWaitGroup.Done()
defer mover.sendRemaining()
for !mover.moverShutDownFlag.Load() {
sleepMs := config.LingerMs
nowTimeMs := time.Now().UnixMilli()
toSendBatches := make([]*ProducerBatch, 0)
mover.logAccumulator.lock.Lock()
for key, batch := range mover.logAccumulator.logGroupData {
if batch == nil {
continue
}
timeInterval := batch.createTimeMs + config.LingerMs - nowTimeMs
if timeInterval <= 0 {
toSendBatches = append(toSendBatches, batch)
mover.logAccumulator.logGroupData[key] = nil
} else if sleepMs > timeInterval {
sleepMs = timeInterval
}
}
mover.logAccumulator.lock.Unlock()
for _, batch := range toSendBatches {
mover.threadPool.addTask(batch)
}
retryBatches := mover.retryQueue.getRetryBatch(mover.moverShutDownFlag.Load())
if len(retryBatches) > 0 {
for _, batch := range retryBatches {
mover.threadPool.addTask(batch)
}
time.Sleep(time.Millisecond)
} else {
time.Sleep(time.Duration(sleepMs) * time.Millisecond)
}
mover.clearEmptyKeys()
}
}
func (mover *Mover) clearEmptyKeys() {
mover.logAccumulator.lock.Lock()
if len(mover.logAccumulator.logGroupData) > 1000 {
for key, batch := range mover.logAccumulator.logGroupData {
if batch == nil {
delete(mover.logAccumulator.logGroupData, key)
}
}
}
mover.logAccumulator.lock.Unlock()
}
func (mover *Mover) sendRemaining() {
mover.logAccumulator.lock.Lock()
for _, batch := range mover.logAccumulator.logGroupData {
if batch != nil && batch.totalDataSize > 0 {
mover.threadPool.addTask(batch)
}
}
mover.logAccumulator.logGroupData = make(map[string]*ProducerBatch)
mover.logAccumulator.lock.Unlock()
producerBatchList := mover.retryQueue.getRetryBatch(mover.moverShutDownFlag.Load())
for _, batch := range producerBatchList {
mover.threadPool.addTask(batch)
}
level.Info(mover.logger).Log("msg", "mover thread send remaining complete")
}