in consumer/consumer.go [680:764]
func (dc *defaultConsumer) updateProcessQueueTable(topic string, mqs []*primitive.MessageQueue) bool {
var changed bool
mqSet := make(map[primitive.MessageQueue]bool)
for idx := range mqs {
mqSet[*mqs[idx]] = true
}
dc.processQueueTable.Range(func(key, value interface{}) bool {
mq := key.(primitive.MessageQueue)
pq := value.(*processQueue)
if mq.Topic == topic {
if !mqSet[mq] {
pq.WithDropped(true)
if dc.removeUnnecessaryMessageQueue(&mq, pq) {
dc.processQueueTable.Delete(key)
changed = true
rlog.Info("remove unnecessary mq when updateProcessQueueTable", map[string]interface{}{
rlog.LogKeyConsumerGroup: dc.consumerGroup,
rlog.LogKeyMessageQueue: mq.String(),
})
}
} else if pq.isPullExpired() && dc.cType == _PushConsume {
pq.WithDropped(true)
if dc.removeUnnecessaryMessageQueue(&mq, pq) {
dc.processQueueTable.Delete(key)
changed = true
rlog.Warning("remove unnecessary mq because pull was expired, prepare to fix it", map[string]interface{}{
rlog.LogKeyConsumerGroup: dc.consumerGroup,
rlog.LogKeyMessageQueue: mq.String(),
})
}
}
}
return true
})
for item := range mqSet {
// BUG: the mq will send to channel, if not copy once, the next iter will modify the mq in the channel.
mq := item
_, exist := dc.processQueueTable.Load(mq)
if exist {
continue
}
if dc.consumeOrderly && !dc.lock(&mq) {
rlog.Warning("do defaultConsumer, add a new mq failed, because lock failed", map[string]interface{}{
rlog.LogKeyConsumerGroup: dc.consumerGroup,
rlog.LogKeyMessageQueue: mq.String(),
})
continue
}
dc.storage.remove(&mq)
nextOffset, err := dc.computePullFromWhereWithException(&mq)
if nextOffset >= 0 && err == nil {
_, exist := dc.processQueueTable.Load(mq)
if exist {
rlog.Debug("updateProcessQueueTable do defaultConsumer, mq already exist", map[string]interface{}{
rlog.LogKeyConsumerGroup: dc.consumerGroup,
rlog.LogKeyMessageQueue: mq.String(),
})
} else {
rlog.Debug("updateProcessQueueTable do defaultConsumer, add a new mq", map[string]interface{}{
rlog.LogKeyConsumerGroup: dc.consumerGroup,
rlog.LogKeyMessageQueue: mq.String(),
})
pq := newProcessQueue(dc.consumeOrderly)
dc.processQueueTable.Store(mq, pq)
pr := PullRequest{
consumerGroup: dc.consumerGroup,
mq: &mq,
pq: pq,
nextOffset: nextOffset,
}
dc.prCh <- pr
changed = true
}
} else {
rlog.Warning("do defaultConsumer, add a new mq failed", map[string]interface{}{
rlog.LogKeyConsumerGroup: dc.consumerGroup,
rlog.LogKeyMessageQueue: mq.String(),
})
}
}
return changed
}