producer/retry_queue.go (67 lines of code) (raw):

package producer import ( "container/heap" "sync" "time" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" ) // RetryQueue cache ProducerBatch and retry latter type RetryQueue struct { batch []*ProducerBatch mutex sync.Mutex } func initRetryQueue() *RetryQueue { retryQueue := RetryQueue{} heap.Init(&retryQueue) return &retryQueue } func (retryQueue *RetryQueue) sendToRetryQueue(producerBatch *ProducerBatch, logger log.Logger) { level.Debug(logger).Log("msg", "Send to retry queue") retryQueue.mutex.Lock() defer retryQueue.mutex.Unlock() if producerBatch != nil { heap.Push(retryQueue, producerBatch) } } func (retryQueue *RetryQueue) getRetryBatch(moverShutDownFlag bool) (producerBatchList []*ProducerBatch) { retryQueue.mutex.Lock() defer retryQueue.mutex.Unlock() if !moverShutDownFlag { for retryQueue.Len() > 0 { producerBatch := heap.Pop(retryQueue) if producerBatch.(*ProducerBatch).nextRetryMs < time.Now().UnixMilli() { producerBatchList = append(producerBatchList, producerBatch.(*ProducerBatch)) } else { heap.Push(retryQueue, producerBatch.(*ProducerBatch)) break } } } else { for retryQueue.Len() > 0 { producerBatch := heap.Pop(retryQueue) producerBatchList = append(producerBatchList, producerBatch.(*ProducerBatch)) } } return producerBatchList } func (retryQueue *RetryQueue) Len() int { return len(retryQueue.batch) } func (retryQueue *RetryQueue) Less(i, j int) bool { return retryQueue.batch[i].nextRetryMs < retryQueue.batch[j].nextRetryMs } func (retryQueue *RetryQueue) Swap(i, j int) { retryQueue.batch[i], retryQueue.batch[j] = retryQueue.batch[j], retryQueue.batch[i] } func (retryQueue *RetryQueue) Push(x interface{}) { item := x.(*ProducerBatch) retryQueue.batch = append(retryQueue.batch, item) } func (retryQueue *RetryQueue) Pop() interface{} { old := retryQueue.batch n := len(old) item := old[n-1] old[n-1] = nil retryQueue.batch = old[0 : n-1] return item }