in consumer/push_consumer.go [494:539]
func (pc *pushConsumer) messageQueueChanged(topic string, mqAll, mqDivided []*primitive.MessageQueue) {
v, exit := pc.subscriptionDataTable.Load(topic)
if !exit {
return
}
data := v.(*internal.SubscriptionData)
newVersion := time.Now().UnixNano()
rlog.Info("the MessageQueue changed, version also updated", map[string]interface{}{
rlog.LogKeyValueChangedFrom: data.SubVersion,
rlog.LogKeyValueChangedTo: newVersion,
})
data.SubVersion = newVersion
// TODO: optimize
count := 0
pc.processQueueTable.Range(func(key, value interface{}) bool {
count++
return true
})
if count > 0 {
if pc.option.PullThresholdForTopic != -1 {
newVal := pc.option.PullThresholdForTopic / count
if newVal == 0 {
newVal = 1
}
rlog.Info("The PullThresholdForTopic is changed", map[string]interface{}{
rlog.LogKeyValueChangedFrom: pc.option.PullThresholdForTopic,
rlog.LogKeyValueChangedTo: newVal,
})
pc.option.PullThresholdForTopic = newVal
}
if pc.option.PullThresholdSizeForTopic != -1 {
newVal := pc.option.PullThresholdSizeForTopic / count
if newVal == 0 {
newVal = 1
}
rlog.Info("The PullThresholdSizeForTopic is changed", map[string]interface{}{
rlog.LogKeyValueChangedFrom: pc.option.PullThresholdSizeForTopic,
rlog.LogKeyValueChangedTo: newVal,
})
pc.option.PullThresholdSizeForTopic = newVal
}
}
pc.client.SendHeartbeatToAllBrokerWithLock()
}