in golang/loadBalancer.go [66:121]
func (plb *publishingLoadBalancer) TakeMessageQueues(excluded sync.Map, count int) ([]*v2.MessageQueue, error) {
if len(plb.messageQueues) == 0 {
return nil, fmt.Errorf("messageQueues is empty")
}
next := atomic.AddInt32(&plb.index, 1)
var candidates []*v2.MessageQueue
candidateBrokerNames := make(map[string]bool, 32)
for i := 0; i < len(plb.messageQueues); i++ {
idx := utils.Mod(next+1, len(plb.messageQueues))
selectMessageQueue := plb.messageQueues[idx]
broker := selectMessageQueue.Broker
brokerName := broker.GetName()
if _, ok := candidateBrokerNames[brokerName]; ok {
continue
}
pass := false
for _, address := range broker.GetEndpoints().GetAddresses() {
if _, ok := excluded.Load(utils.ParseAddress(address)); ok {
pass = true
break
}
}
if pass {
continue
}
candidates = append(candidates, selectMessageQueue)
candidateBrokerNames[brokerName] = true
if len(candidates) >= count {
return candidates, nil
}
}
if len(candidates) == 0 {
for i := 0; i < len(plb.messageQueues); i++ {
idx := utils.Mod(next+1, len(plb.messageQueues))
selectMessageQueue := plb.messageQueues[idx]
broker := selectMessageQueue.Broker
brokerName := broker.GetName()
if _, ok := candidateBrokerNames[brokerName]; ok {
continue
}
candidates = append(candidates, selectMessageQueue)
candidateBrokerNames[brokerName] = true
if len(candidates) >= count {
return candidates, nil
}
}
}
return candidates, nil
}