in consumer/process_queue.go [237:289]
func (pq *processQueue) cleanExpiredMsg(pc *pushConsumer) {
if pc.option.ConsumeOrderly {
return
}
var loop = 16
if pq.msgCache.Size() < 16 {
loop = pq.msgCache.Size()
}
for i := 0; i < loop; i++ {
pq.mutex.RLock()
if pq.msgCache.Empty() {
pq.mutex.RUnlock()
return
}
_, firstValue := pq.msgCache.Min()
msg := firstValue.(*primitive.MessageExt)
startTime := msg.GetProperty(primitive.PropertyConsumeStartTime)
if startTime != "" {
st, err := strconv.ParseInt(startTime, 10, 64)
if err != nil {
rlog.Warning("parse message start consume time error", map[string]interface{}{
"time": startTime,
rlog.LogKeyUnderlayError: err,
})
pq.mutex.RUnlock()
continue
}
if time.Now().UnixNano()/1e6-st <= int64(pc.option.ConsumeTimeout/time.Millisecond) {
pq.mutex.RUnlock()
return
}
rlog.Info("send expire msg back. ", map[string]interface{}{
rlog.LogKeyTopic: msg.Topic,
rlog.LogKeyMessageId: msg.MsgId,
"startTime": startTime,
rlog.LogKeyStoreHost: msg.StoreHost,
rlog.LogKeyQueueId: msg.Queue.QueueId,
rlog.LogKeyQueueOffset: msg.QueueOffset,
})
pq.mutex.RUnlock()
if !pc.sendMessageBack("", msg, int(3+msg.ReconsumeTimes)) {
rlog.Error("send message back to broker error when clean expired messages", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
continue
}
pq.removeMessage(msg)
} else {
pq.mutex.RUnlock()
}
}
}