in consumer/strategy.go [149:201]
func AllocateByMachineRoom(consumeridcs []string) AllocateStrategy {
return func(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, cidAll []string) []*primitive.MessageQueue {
if currentCID == "" || len(mqAll) == 0 || len(cidAll) == 0 {
return nil
}
var (
find bool
index int
)
for idx := range cidAll {
if cidAll[idx] == currentCID {
find = true
index = idx
break
}
}
if !find {
rlog.Warning("[BUG] ConsumerId not in cidAll", map[string]interface{}{
rlog.LogKeyConsumerGroup: consumerGroup,
"consumerId": currentCID,
"cidAll": cidAll,
})
return nil
}
var premqAll []*primitive.MessageQueue
for _, mq := range mqAll {
temp := strings.Split(mq.BrokerName, "@")
if len(temp) == 2 {
for _, idc := range consumeridcs {
if idc == temp[0] {
premqAll = append(premqAll, mq)
}
}
}
}
mod := len(premqAll) / len(cidAll)
rem := len(premqAll) % len(cidAll)
startIndex := mod * index
endIndex := startIndex + mod
result := make([]*primitive.MessageQueue, 0)
for i := startIndex; i < endIndex; i++ {
result = append(result, mqAll[i])
}
if rem > index {
result = append(result, premqAll[index+mod*len(cidAll)])
}
return result
}
}