func AllocateByConsistentHash()

in consumer/strategy.go [203:247]


func AllocateByConsistentHash(virtualNodeCnt int) AllocateStrategy {
	return func(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, cidAll []string) []*primitive.MessageQueue {
		if currentCID == "" || len(mqAll) == 0 || len(cidAll) == 0 {
			return nil
		}

		var (
			find bool
		)
		for idx := range cidAll {
			if cidAll[idx] == currentCID {
				find = true
				break
			}
		}
		if !find {
			rlog.Warning("[BUG] ConsumerId not in cidAll", map[string]interface{}{
				rlog.LogKeyConsumerGroup: consumerGroup,
				"consumerId":             currentCID,
				"cidAll":                 cidAll,
			})
			return nil
		}

		c := consistent.New()
		c.NumberOfReplicas = virtualNodeCnt
		for _, cid := range cidAll {
			c.Add(cid)
		}

		result := make([]*primitive.MessageQueue, 0)
		for _, mq := range mqAll {
			clientNode, err := c.Get(mq.String())
			if err != nil {
				rlog.Warning("[BUG] AllocateByConsistentHash err: %s", map[string]interface{}{
					rlog.LogKeyUnderlayError: err,
				})
			}
			if currentCID == clientNode {
				result = append(result, mq)
			}
		}
		return result
	}
}