in consumer/strategy.go [203:247]
func AllocateByConsistentHash(virtualNodeCnt int) AllocateStrategy {
return func(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, cidAll []string) []*primitive.MessageQueue {
if currentCID == "" || len(mqAll) == 0 || len(cidAll) == 0 {
return nil
}
var (
find bool
)
for idx := range cidAll {
if cidAll[idx] == currentCID {
find = true
break
}
}
if !find {
rlog.Warning("[BUG] ConsumerId not in cidAll", map[string]interface{}{
rlog.LogKeyConsumerGroup: consumerGroup,
"consumerId": currentCID,
"cidAll": cidAll,
})
return nil
}
c := consistent.New()
c.NumberOfReplicas = virtualNodeCnt
for _, cid := range cidAll {
c.Add(cid)
}
result := make([]*primitive.MessageQueue, 0)
for _, mq := range mqAll {
clientNode, err := c.Get(mq.String())
if err != nil {
rlog.Warning("[BUG] AllocateByConsistentHash err: %s", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
})
}
if currentCID == clientNode {
result = append(result, mq)
}
}
return result
}
}