func()

in consumer/process_queue.go [97:144]


func (pq *processQueue) putMessage(messages ...*primitive.MessageExt) {
	if len(messages) == 0 {
		return
	}
	if pq.IsDroppd() {
		return
	}
	pq.mutex.Lock()
	validMessageCount := 0
	for idx := range messages {
		msg := messages[idx]
		_, found := pq.msgCache.Get(msg.QueueOffset)
		if found {
			continue
		}
		_, found = pq.consumingMsgOrderlyTreeMap.Get(msg.QueueOffset)
		if found {
			continue
		}
		pq.msgCache.Put(msg.QueueOffset, msg)
		validMessageCount++
		pq.queueOffsetMax = msg.QueueOffset

		pq.cachedMsgSize.Add(int64(len(msg.Body)))
	}
	pq.cachedMsgCount.Add(int64(validMessageCount))
	pq.mutex.Unlock()
	if !pq.order {
		select {
		case <-pq.closeChan:
			return
		case pq.msgCh <- messages:
		}
	}

	if pq.cachedMsgCount.Load() > 0 && !pq.consuming {
		pq.consuming = true
	}

	msg := messages[len(messages)-1]
	maxOffset, err := strconv.ParseInt(msg.GetProperty(primitive.PropertyMaxOffset), 10, 64)
	if err != nil {
		acc := maxOffset - msg.QueueOffset
		if acc > 0 {
			pq.msgAccCnt = acc
		}
	}
}