in consumer/consumer.go [383:456]
func (dc *defaultConsumer) doBalance() {
dc.subscriptionDataTable.Range(func(key, value interface{}) bool {
topic := key.(string)
v, exist := dc.topicSubscribeInfoTable.Load(topic)
if !exist {
rlog.Warning("do balance in group failed, the topic does not exist", map[string]interface{}{
rlog.LogKeyConsumerGroup: dc.consumerGroup,
rlog.LogKeyTopic: topic,
})
return true
}
mqs := v.([]*primitive.MessageQueue)
switch dc.model {
case BroadCasting:
changed := dc.updateProcessQueueTable(topic, mqs)
if changed {
dc.mqChanged(topic, mqs, mqs)
rlog.Debug("MessageQueue changed", map[string]interface{}{
rlog.LogKeyConsumerGroup: dc.consumerGroup,
rlog.LogKeyTopic: topic,
rlog.LogKeyMessageQueue: fmt.Sprintf("%v", mqs),
})
}
case Clustering:
cidAll := dc.findConsumerList(topic)
if cidAll == nil {
rlog.Warning("do balance in group failed, get consumer id list failed", map[string]interface{}{
rlog.LogKeyConsumerGroup: dc.consumerGroup,
rlog.LogKeyTopic: topic,
})
return true
}
mqAll := make([]*primitive.MessageQueue, len(mqs))
copy(mqAll, mqs)
sort.Strings(cidAll)
sort.SliceStable(mqAll, func(i, j int) bool {
v := strings.Compare(mqAll[i].Topic, mqAll[j].Topic)
if v != 0 {
return v < 0
}
v = strings.Compare(mqAll[i].BrokerName, mqAll[j].BrokerName)
if v != 0 {
return v < 0
}
return (mqAll[i].QueueId - mqAll[j].QueueId) < 0
})
allocateResult := dc.allocate(dc.consumerGroup, dc.client.ClientID(), mqAll, cidAll)
// Principle of flow control: pull TPS = 1000ms/PullInterval * BatchSize * len(allocateResult)
if consumeTPS := dc.option.ConsumeTPS.Load(); consumeTPS > 0 && len(allocateResult) > 0 {
pullBatchSize := dc.option.PullBatchSize.Load()
pullTimesPerSecond := float64(consumeTPS) / float64(pullBatchSize*int32(len(allocateResult)))
dc.option.PullInterval.Store(time.Duration(float64(time.Second) / pullTimesPerSecond))
}
changed := dc.updateProcessQueueTable(topic, allocateResult)
if changed {
dc.mqChanged(topic, mqAll, allocateResult)
rlog.Info("MessageQueue do balance done", map[string]interface{}{
rlog.LogKeyConsumerGroup: dc.consumerGroup,
rlog.LogKeyTopic: topic,
"clientID": dc.client.ClientID(),
"mqAllSize": len(mqAll),
"cidAllSize": len(cidAll),
"rebalanceResultSize": len(allocateResult),
"rebalanceResultSet": allocateResult,
})
}
}
return true
})
dc.truncateMessageQueueNotMyTopic()
}