in consumer/strategy.go [46:100]
func AllocateByAveragely(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue,
cidAll []string) []*primitive.MessageQueue {
if currentCID == "" || len(mqAll) == 0 || len(cidAll) == 0 {
return nil
}
var (
find bool
index int
)
for idx := range cidAll {
if cidAll[idx] == currentCID {
find = true
index = idx
break
}
}
if !find {
rlog.Warning("[BUG] ConsumerId not in cidAll", map[string]interface{}{
rlog.LogKeyConsumerGroup: consumerGroup,
"consumerId": currentCID,
"cidAll": cidAll,
})
return nil
}
mqSize := len(mqAll)
cidSize := len(cidAll)
mod := mqSize % cidSize
var averageSize int
if mqSize <= cidSize {
averageSize = 1
} else {
if mod > 0 && index < mod {
averageSize = mqSize/cidSize + 1
} else {
averageSize = mqSize / cidSize
}
}
var startIndex int
if mod > 0 && index < mod {
startIndex = index * averageSize
} else {
startIndex = index*averageSize + mod
}
num := utils.MinInt(averageSize, mqSize-startIndex)
result := make([]*primitive.MessageQueue, 0)
for i := 0; i < num; i++ {
result = append(result, mqAll[(startIndex+i)%mqSize])
}
return result
}