func()

in consumer/push_consumer.go [494:541]


func (pc *pushConsumer) messageQueueChanged(topic string, mqAll, mqDivided []*primitive.MessageQueue) {
	v, exit := pc.subscriptionDataTable.Load(topic)
	if !exit {
		return
	}
	data := v.(*internal.SubscriptionData)
	newVersion := time.Now().UnixNano()
	rlog.Info("the MessageQueue changed, version also updated", map[string]interface{}{
		rlog.LogKeyValueChangedFrom: data.SubVersion,
		rlog.LogKeyValueChangedTo:   newVersion,
	})
	data.Lock()
	data.SubVersion = newVersion
	data.Unlock()

	// TODO: optimize
	count := 0
	pc.processQueueTable.Range(func(key, value interface{}) bool {
		count++
		return true
	})
	if count > 0 {
		if pc.option.PullThresholdForTopic != -1 {
			newVal := pc.option.PullThresholdForTopic / count
			if newVal == 0 {
				newVal = 1
			}
			rlog.Info("The PullThresholdForQueue is changed", map[string]interface{}{
				rlog.LogKeyValueChangedFrom: pc.option.PullThresholdForQueue.Load(),
				rlog.LogKeyValueChangedTo:   newVal,
			})
			pc.option.PullThresholdForQueue.Store(int64(newVal))
		}

		if pc.option.PullThresholdSizeForTopic != -1 {
			newVal := pc.option.PullThresholdSizeForTopic / count
			if newVal == 0 {
				newVal = 1
			}
			rlog.Info("The PullThresholdSizeForQueue is changed", map[string]interface{}{
				rlog.LogKeyValueChangedFrom: pc.option.PullThresholdSizeForQueue.Load(),
				rlog.LogKeyValueChangedTo:   newVal,
			})
			pc.option.PullThresholdSizeForQueue.Store(int32(newVal))
		}
	}
	pc.client.SendHeartbeatToAllBrokerWithLock()
}