pkg/remoting/loadbalance/consistent_hash_loadbalance.go (115 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package loadbalance import ( "crypto/md5" "fmt" "sort" "sync" getty "github.com/apache/dubbo-getty" ) var ( once sync.Once defaultVirtualNodeNumber = 10 consistentInstance *Consistent ) type Consistent struct { sync.RWMutex virtualNodeCount int // consistent hashCircle hashCircle map[int64]getty.Session sortedHashNodes []int64 } func (c *Consistent) put(key int64, session getty.Session) { c.Lock() defer c.Unlock() c.hashCircle[key] = session } func (c *Consistent) hash(key string) int64 { hashByte := md5.Sum([]byte(key)) var res int64 for i := 0; i < 4; i++ { res <<= 8 res |= int64(hashByte[i]) & 0xff } return res } // pick get a node func (c *Consistent) pick(sessions *sync.Map, key string) getty.Session { hashKey := c.hash(key) index := sort.Search(len(c.sortedHashNodes), func(i int) bool { return c.sortedHashNodes[i] >= hashKey }) if index == len(c.sortedHashNodes) { return RandomLoadBalance(sessions, key) } c.RLock() session, ok := c.hashCircle[c.sortedHashNodes[index]] if !ok { c.RUnlock() return RandomLoadBalance(sessions, key) } c.RUnlock() if session.IsClosed() { go c.refreshHashCircle(sessions) return c.firstKey() } return session } // refreshHashCircle refresh hashCircle func (c *Consistent) refreshHashCircle(sessions *sync.Map) { var sortedHashNodes []int64 hashCircle := make(map[int64]getty.Session) var session getty.Session sessions.Range(func(key, value interface{}) bool { session = key.(getty.Session) for i := 0; i < defaultVirtualNodeNumber; i++ { if !session.IsClosed() { position := c.hash(fmt.Sprintf("%s%d", session.RemoteAddr(), i)) hashCircle[position] = session sortedHashNodes = append(sortedHashNodes, position) } else { sessions.Delete(key) } } return true }) // virtual node sort sort.Slice(sortedHashNodes, func(i, j int) bool { return sortedHashNodes[i] < sortedHashNodes[j] }) c.sortedHashNodes = sortedHashNodes c.hashCircle = hashCircle } func (c *Consistent) firstKey() getty.Session { c.RLock() defer c.RUnlock() if len(c.sortedHashNodes) > 0 { return c.hashCircle[c.sortedHashNodes[0]] } return nil } func newConsistenceInstance(sessions *sync.Map) *Consistent { once.Do(func() { consistentInstance = &Consistent{ hashCircle: make(map[int64]getty.Session), } // construct hash circle sessions.Range(func(key, value interface{}) bool { session := key.(getty.Session) for i := 0; i < defaultVirtualNodeNumber; i++ { if !session.IsClosed() { position := consistentInstance.hash(fmt.Sprintf("%s%d", session.RemoteAddr(), i)) consistentInstance.put(position, session) consistentInstance.sortedHashNodes = append(consistentInstance.sortedHashNodes, position) } else { sessions.Delete(key) } } return true }) // virtual node sort sort.Slice(consistentInstance.sortedHashNodes, func(i, j int) bool { return consistentInstance.sortedHashNodes[i] < consistentInstance.sortedHashNodes[j] }) }) return consistentInstance } func ConsistentHashLoadBalance(sessions *sync.Map, xid string) getty.Session { if consistentInstance == nil { newConsistenceInstance(sessions) } // pick a node return consistentInstance.pick(sessions, xid) }