func()

in consumer/push_consumer.go [494:539]


func (pc *pushConsumer) messageQueueChanged(topic string, mqAll, mqDivided []*primitive.MessageQueue) {
	v, exit := pc.subscriptionDataTable.Load(topic)
	if !exit {
		return
	}
	data := v.(*internal.SubscriptionData)
	newVersion := time.Now().UnixNano()
	rlog.Info("the MessageQueue changed, version also updated", map[string]interface{}{
		rlog.LogKeyValueChangedFrom: data.SubVersion,
		rlog.LogKeyValueChangedTo:   newVersion,
	})
	data.SubVersion = newVersion

	// TODO: optimize
	count := 0
	pc.processQueueTable.Range(func(key, value interface{}) bool {
		count++
		return true
	})
	if count > 0 {
		if pc.option.PullThresholdForTopic != -1 {
			newVal := pc.option.PullThresholdForTopic / count
			if newVal == 0 {
				newVal = 1
			}
			rlog.Info("The PullThresholdForTopic is changed", map[string]interface{}{
				rlog.LogKeyValueChangedFrom: pc.option.PullThresholdForTopic,
				rlog.LogKeyValueChangedTo:   newVal,
			})
			pc.option.PullThresholdForTopic = newVal
		}

		if pc.option.PullThresholdSizeForTopic != -1 {
			newVal := pc.option.PullThresholdSizeForTopic / count
			if newVal == 0 {
				newVal = 1
			}
			rlog.Info("The PullThresholdSizeForTopic is changed", map[string]interface{}{
				rlog.LogKeyValueChangedFrom: pc.option.PullThresholdSizeForTopic,
				rlog.LogKeyValueChangedTo:   newVal,
			})
			pc.option.PullThresholdSizeForTopic = newVal
		}
	}
	pc.client.SendHeartbeatToAllBrokerWithLock()
}