func()

in golang/loadBalancer.go [65:120]


func (plb *publishingLoadBalancer) TakeMessageQueues(excluded sync.Map, count int) ([]*v2.MessageQueue, error) {
	if len(plb.messageQueues) == 0 {
		return nil, fmt.Errorf("messageQueues is empty")
	}
	next := atomic.AddInt32(&plb.index, 1)
	var candidates []*v2.MessageQueue
	candidateBrokerNames := make(map[string]bool, 32)

	for i := 0; i < len(plb.messageQueues); i++ {
		idx := utils.Mod(next+1, len(plb.messageQueues))
		selectMessageQueue := plb.messageQueues[idx]
		broker := selectMessageQueue.Broker
		brokerName := broker.GetName()

		if _, ok := candidateBrokerNames[brokerName]; ok {
			continue
		}

		pass := false
		for _, address := range broker.GetEndpoints().GetAddresses() {
			if _, ok := excluded.Load(utils.ParseAddress(address)); ok {
				pass = true
				break
			}
		}
		if pass {
			continue
		}

		candidates = append(candidates, selectMessageQueue)
		candidateBrokerNames[brokerName] = true

		if len(candidates) >= count {
			return candidates, nil
		}
	}
	if len(candidates) == 0 {
		for i := 0; i < len(plb.messageQueues); i++ {
			idx := utils.Mod(next+1, len(plb.messageQueues))
			selectMessageQueue := plb.messageQueues[idx]
			broker := selectMessageQueue.Broker
			brokerName := broker.GetName()

			if _, ok := candidateBrokerNames[brokerName]; ok {
				continue
			}
			candidates = append(candidates, selectMessageQueue)
			candidateBrokerNames[brokerName] = true

			if len(candidates) >= count {
				return candidates, nil
			}
		}
	}
	return candidates, nil
}