hostpool/hostpool.go (101 lines of code) (raw):

package hostpool import ( "sync" "github.com/hailocab/go-hostpool" "github.com/gocql/gocql" ) // HostPoolHostPolicy is a host policy which uses the bitly/go-hostpool library // to distribute queries between hosts and prevent sending queries to // unresponsive hosts. When creating the host pool that is passed to the policy // use an empty slice of hosts as the hostpool will be populated later by gocql. // See below for examples of usage: // // // Create host selection policy using a simple host pool // cluster.PoolConfig.HostSelectionPolicy = HostPoolHostPolicy(hostpool.New(nil)) // // // Create host selection policy using an epsilon greedy pool // cluster.PoolConfig.HostSelectionPolicy = HostPoolHostPolicy( // hostpool.NewEpsilonGreedy(nil, 0, &hostpool.LinearEpsilonValueCalculator{}), // ) func HostPoolHostPolicy(hp hostpool.HostPool) *hostPoolHostPolicy { return &hostPoolHostPolicy{hostMap: map[string]*gocql.HostInfo{}, hp: hp} } type hostPoolHostPolicy struct { hp hostpool.HostPool mu sync.RWMutex hostMap map[string]*gocql.HostInfo } func (r *hostPoolHostPolicy) Init(*gocql.Session) {} func (r *hostPoolHostPolicy) KeyspaceChanged(gocql.KeyspaceUpdateEvent) {} func (r *hostPoolHostPolicy) SetPartitioner(string) {} func (r *hostPoolHostPolicy) IsLocal(*gocql.HostInfo) bool { return true } func (r *hostPoolHostPolicy) SetHosts(hosts []*gocql.HostInfo) { peers := make([]string, len(hosts)) hostMap := make(map[string]*gocql.HostInfo, len(hosts)) for i, host := range hosts { ip := host.ConnectAddress().String() peers[i] = ip hostMap[ip] = host } r.mu.Lock() r.hp.SetHosts(peers) r.hostMap = hostMap r.mu.Unlock() } func (r *hostPoolHostPolicy) AddHost(host *gocql.HostInfo) { ip := host.ConnectAddress().String() r.mu.Lock() defer r.mu.Unlock() // If the host addr is present and isn't nil return if h, ok := r.hostMap[ip]; ok && h != nil { return } // otherwise, add the host to the map r.hostMap[ip] = host // and construct a new peer list to give to the HostPool hosts := make([]string, 0, len(r.hostMap)) for addr := range r.hostMap { hosts = append(hosts, addr) } r.hp.SetHosts(hosts) } func (r *hostPoolHostPolicy) RemoveHost(host *gocql.HostInfo) { ip := host.ConnectAddress().String() r.mu.Lock() defer r.mu.Unlock() if _, ok := r.hostMap[ip]; !ok { return } delete(r.hostMap, ip) hosts := make([]string, 0, len(r.hostMap)) for _, host := range r.hostMap { hosts = append(hosts, host.ConnectAddress().String()) } r.hp.SetHosts(hosts) } func (r *hostPoolHostPolicy) HostUp(host *gocql.HostInfo) { r.AddHost(host) } func (r *hostPoolHostPolicy) HostDown(host *gocql.HostInfo) { r.RemoveHost(host) } func (r *hostPoolHostPolicy) Pick(qry gocql.ExecutableQuery) gocql.NextHost { return func() gocql.SelectedHost { r.mu.RLock() defer r.mu.RUnlock() if len(r.hostMap) == 0 { return nil } hostR := r.hp.Get() host, ok := r.hostMap[hostR.Host()] if !ok { return nil } return selectedHostPoolHost{ policy: r, info: host, hostR: hostR, } } } // selectedHostPoolHost is a host returned by the hostPoolHostPolicy and // implements the SelectedHost interface type selectedHostPoolHost struct { policy *hostPoolHostPolicy info *gocql.HostInfo hostR hostpool.HostPoolResponse } func (host selectedHostPoolHost) Info() *gocql.HostInfo { return host.info } func (host selectedHostPoolHost) Mark(err error) { ip := host.info.ConnectAddress().String() host.policy.mu.RLock() defer host.policy.mu.RUnlock() if _, ok := host.policy.hostMap[ip]; !ok { // host was removed between pick and mark return } host.hostR.Mark(err) }