func()

in consumer/consumer.go [680:764]


func (dc *defaultConsumer) updateProcessQueueTable(topic string, mqs []*primitive.MessageQueue) bool {
	var changed bool
	mqSet := make(map[primitive.MessageQueue]bool)
	for idx := range mqs {
		mqSet[*mqs[idx]] = true
	}
	dc.processQueueTable.Range(func(key, value interface{}) bool {
		mq := key.(primitive.MessageQueue)
		pq := value.(*processQueue)
		if mq.Topic == topic {
			if !mqSet[mq] {
				pq.WithDropped(true)
				if dc.removeUnnecessaryMessageQueue(&mq, pq) {
					dc.processQueueTable.Delete(key)
					changed = true
					rlog.Info("remove unnecessary mq when updateProcessQueueTable", map[string]interface{}{
						rlog.LogKeyConsumerGroup: dc.consumerGroup,
						rlog.LogKeyMessageQueue:  mq.String(),
					})
				}
			} else if pq.isPullExpired() && dc.cType == _PushConsume {
				pq.WithDropped(true)
				if dc.removeUnnecessaryMessageQueue(&mq, pq) {
					dc.processQueueTable.Delete(key)
					changed = true
					rlog.Warning("remove unnecessary mq because pull was expired, prepare to fix it", map[string]interface{}{
						rlog.LogKeyConsumerGroup: dc.consumerGroup,
						rlog.LogKeyMessageQueue:  mq.String(),
					})
				}
			}
		}
		return true
	})

	for item := range mqSet {
		// BUG: the mq will send to channel, if not copy once, the next iter will modify the mq in the channel.
		mq := item
		_, exist := dc.processQueueTable.Load(mq)
		if exist {
			continue
		}
		if dc.consumeOrderly && !dc.lock(&mq) {
			rlog.Warning("do defaultConsumer, add a new mq failed, because lock failed", map[string]interface{}{
				rlog.LogKeyConsumerGroup: dc.consumerGroup,
				rlog.LogKeyMessageQueue:  mq.String(),
			})
			continue
		}
		dc.storage.remove(&mq)
		nextOffset, err := dc.computePullFromWhereWithException(&mq)

		if nextOffset >= 0 && err == nil {
			_, exist := dc.processQueueTable.Load(mq)
			if exist {
				rlog.Debug("updateProcessQueueTable do defaultConsumer, mq already exist", map[string]interface{}{
					rlog.LogKeyConsumerGroup: dc.consumerGroup,
					rlog.LogKeyMessageQueue:  mq.String(),
				})
			} else {
				rlog.Debug("updateProcessQueueTable do defaultConsumer, add a new mq", map[string]interface{}{
					rlog.LogKeyConsumerGroup: dc.consumerGroup,
					rlog.LogKeyMessageQueue:  mq.String(),
				})
				pq := newProcessQueue(dc.consumeOrderly)
				dc.processQueueTable.Store(mq, pq)
				pr := PullRequest{
					consumerGroup: dc.consumerGroup,
					mq:            &mq,
					pq:            pq,
					nextOffset:    nextOffset,
				}
				dc.prCh <- pr
				changed = true
			}
		} else {
			rlog.Warning("do defaultConsumer, add a new mq failed", map[string]interface{}{
				rlog.LogKeyConsumerGroup: dc.consumerGroup,
				rlog.LogKeyMessageQueue:  mq.String(),
			})
		}
	}

	return changed
}