func AllocateByMachineRoom()

in consumer/strategy.go [149:201]


func AllocateByMachineRoom(consumeridcs []string) AllocateStrategy {
	return func(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, cidAll []string) []*primitive.MessageQueue {
		if currentCID == "" || len(mqAll) == 0 || len(cidAll) == 0 {
			return nil
		}

		var (
			find  bool
			index int
		)
		for idx := range cidAll {
			if cidAll[idx] == currentCID {
				find = true
				index = idx
				break
			}
		}
		if !find {
			rlog.Warning("[BUG] ConsumerId not in cidAll", map[string]interface{}{
				rlog.LogKeyConsumerGroup: consumerGroup,
				"consumerId":             currentCID,
				"cidAll":                 cidAll,
			})
			return nil
		}

		var premqAll []*primitive.MessageQueue
		for _, mq := range mqAll {
			temp := strings.Split(mq.BrokerName, "@")
			if len(temp) == 2 {
				for _, idc := range consumeridcs {
					if idc == temp[0] {
						premqAll = append(premqAll, mq)
					}
				}
			}
		}

		mod := len(premqAll) / len(cidAll)
		rem := len(premqAll) % len(cidAll)
		startIndex := mod * index
		endIndex := startIndex + mod

		result := make([]*primitive.MessageQueue, 0)
		for i := startIndex; i < endIndex; i++ {
			result = append(result, mqAll[i])
		}
		if rem > index {
			result = append(result, premqAll[index+mod*len(cidAll)])
		}
		return result
	}
}