func()

in consumer/process_queue.go [237:289]


func (pq *processQueue) cleanExpiredMsg(pc *pushConsumer) {
	if pc.option.ConsumeOrderly {
		return
	}
	var loop = 16
	if pq.msgCache.Size() < 16 {
		loop = pq.msgCache.Size()
	}

	for i := 0; i < loop; i++ {
		pq.mutex.RLock()
		if pq.msgCache.Empty() {
			pq.mutex.RUnlock()
			return
		}
		_, firstValue := pq.msgCache.Min()
		msg := firstValue.(*primitive.MessageExt)
		startTime := msg.GetProperty(primitive.PropertyConsumeStartTime)
		if startTime != "" {
			st, err := strconv.ParseInt(startTime, 10, 64)
			if err != nil {
				rlog.Warning("parse message start consume time error", map[string]interface{}{
					"time":                   startTime,
					rlog.LogKeyUnderlayError: err,
				})
				pq.mutex.RUnlock()
				continue
			}
			if time.Now().UnixNano()/1e6-st <= int64(pc.option.ConsumeTimeout/time.Millisecond) {
				pq.mutex.RUnlock()
				return
			}
			rlog.Info("send expire msg back. ", map[string]interface{}{
				rlog.LogKeyTopic:       msg.Topic,
				rlog.LogKeyMessageId:   msg.MsgId,
				"startTime":            startTime,
				rlog.LogKeyStoreHost:   msg.StoreHost,
				rlog.LogKeyQueueId:     msg.Queue.QueueId,
				rlog.LogKeyQueueOffset: msg.QueueOffset,
			})
			pq.mutex.RUnlock()
			if !pc.sendMessageBack(msg.Queue.BrokerName, msg, int(3+msg.ReconsumeTimes)) {
				rlog.Error("send message back to broker error when clean expired messages", map[string]interface{}{
					rlog.LogKeyConsumerGroup: pc.consumerGroup,
				})
				continue
			}
			pq.removeMessage(msg)
		} else {
			pq.mutex.RUnlock()
		}
	}
}