func()

in consumer/consumer.go [383:456]


func (dc *defaultConsumer) doBalance() {
	dc.subscriptionDataTable.Range(func(key, value interface{}) bool {
		topic := key.(string)
		v, exist := dc.topicSubscribeInfoTable.Load(topic)
		if !exist {
			rlog.Warning("do balance in group failed, the topic does not exist", map[string]interface{}{
				rlog.LogKeyConsumerGroup: dc.consumerGroup,
				rlog.LogKeyTopic:         topic,
			})
			return true
		}
		mqs := v.([]*primitive.MessageQueue)
		switch dc.model {
		case BroadCasting:
			changed := dc.updateProcessQueueTable(topic, mqs)
			if changed {
				dc.mqChanged(topic, mqs, mqs)
				rlog.Debug("MessageQueue changed", map[string]interface{}{
					rlog.LogKeyConsumerGroup: dc.consumerGroup,
					rlog.LogKeyTopic:         topic,
					rlog.LogKeyMessageQueue:  fmt.Sprintf("%v", mqs),
				})
			}
		case Clustering:
			cidAll := dc.findConsumerList(topic)
			if cidAll == nil {
				rlog.Warning("do balance in group failed, get consumer id list failed", map[string]interface{}{
					rlog.LogKeyConsumerGroup: dc.consumerGroup,
					rlog.LogKeyTopic:         topic,
				})
				return true
			}
			mqAll := make([]*primitive.MessageQueue, len(mqs))
			copy(mqAll, mqs)
			sort.Strings(cidAll)
			sort.SliceStable(mqAll, func(i, j int) bool {
				v := strings.Compare(mqAll[i].Topic, mqAll[j].Topic)
				if v != 0 {
					return v < 0
				}

				v = strings.Compare(mqAll[i].BrokerName, mqAll[j].BrokerName)
				if v != 0 {
					return v < 0
				}
				return (mqAll[i].QueueId - mqAll[j].QueueId) < 0
			})
			allocateResult := dc.allocate(dc.consumerGroup, dc.client.ClientID(), mqAll, cidAll)

			// Principle of flow control: pull TPS = 1000ms/PullInterval * BatchSize * len(allocateResult)
			if consumeTPS := dc.option.ConsumeTPS.Load(); consumeTPS > 0 && len(allocateResult) > 0 {
				pullBatchSize := dc.option.PullBatchSize.Load()
				pullTimesPerSecond := float64(consumeTPS) / float64(pullBatchSize*int32(len(allocateResult)))
				dc.option.PullInterval.Store(time.Duration(float64(time.Second) / pullTimesPerSecond))
			}

			changed := dc.updateProcessQueueTable(topic, allocateResult)
			if changed {
				dc.mqChanged(topic, mqAll, allocateResult)
				rlog.Info("MessageQueue do balance done", map[string]interface{}{
					rlog.LogKeyConsumerGroup: dc.consumerGroup,
					rlog.LogKeyTopic:         topic,
					"clientID":               dc.client.ClientID(),
					"mqAllSize":              len(mqAll),
					"cidAllSize":             len(cidAll),
					"rebalanceResultSize":    len(allocateResult),
					"rebalanceResultSet":     allocateResult,
				})
			}
		}
		return true
	})
	dc.truncateMessageQueueNotMyTopic()
}