ring.go (115 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Content before git sha 34fdeebefcbf183ed7f916f931aa0586fdaa1b40
* Copyright (c) 2016, The Gocql authors,
* provided under the BSD-3-Clause License.
* See the NOTICE file distributed with this work for additional information.
*/
package gocql
import (
"fmt"
"sync"
"sync/atomic"
)
type ring struct {
// endpoints are the set of endpoints which the driver will attempt to connect
// to in the case it can not reach any of its hosts. They are also used to boot
// strap the initial connection.
endpoints []*HostInfo
mu sync.RWMutex
// hosts are the set of all hosts in the cassandra ring that we know of.
// key of map is host_id.
hosts map[string]*HostInfo
// hostIPToUUID maps host native address to host_id.
hostIPToUUID map[string]string
hostList []*HostInfo
pos uint32
// TODO: we should store the ring metadata here also.
}
func (r *ring) rrHost() *HostInfo {
r.mu.RLock()
defer r.mu.RUnlock()
if len(r.hostList) == 0 {
return nil
}
pos := int(atomic.AddUint32(&r.pos, 1) - 1)
return r.hostList[pos%len(r.hostList)]
}
func (r *ring) getHostByIP(ip string) (*HostInfo, bool) {
r.mu.RLock()
defer r.mu.RUnlock()
hi, ok := r.hostIPToUUID[ip]
return r.hosts[hi], ok
}
func (r *ring) getHost(hostID string) *HostInfo {
r.mu.RLock()
host := r.hosts[hostID]
r.mu.RUnlock()
return host
}
func (r *ring) allHosts() []*HostInfo {
r.mu.RLock()
hosts := make([]*HostInfo, 0, len(r.hosts))
for _, host := range r.hosts {
hosts = append(hosts, host)
}
r.mu.RUnlock()
return hosts
}
func (r *ring) currentHosts() map[string]*HostInfo {
r.mu.RLock()
hosts := make(map[string]*HostInfo, len(r.hosts))
for k, v := range r.hosts {
hosts[k] = v
}
r.mu.RUnlock()
return hosts
}
func (r *ring) addOrUpdate(host *HostInfo) *HostInfo {
if existingHost, ok := r.addHostIfMissing(host); ok {
existingHost.update(host)
host = existingHost
}
return host
}
func (r *ring) addHostIfMissing(host *HostInfo) (*HostInfo, bool) {
if host.invalidConnectAddr() {
panic(fmt.Sprintf("invalid host: %v", host))
}
hostID := host.HostID()
r.mu.Lock()
if r.hosts == nil {
r.hosts = make(map[string]*HostInfo)
}
if r.hostIPToUUID == nil {
r.hostIPToUUID = make(map[string]string)
}
existing, ok := r.hosts[hostID]
if !ok {
r.hosts[hostID] = host
r.hostIPToUUID[host.nodeToNodeAddress().String()] = hostID
existing = host
r.hostList = append(r.hostList, host)
}
r.mu.Unlock()
return existing, ok
}
func (r *ring) removeHost(hostID string) bool {
r.mu.Lock()
if r.hosts == nil {
r.hosts = make(map[string]*HostInfo)
}
if r.hostIPToUUID == nil {
r.hostIPToUUID = make(map[string]string)
}
h, ok := r.hosts[hostID]
if ok {
for i, host := range r.hostList {
if host.HostID() == hostID {
r.hostList = append(r.hostList[:i], r.hostList[i+1:]...)
break
}
}
delete(r.hostIPToUUID, h.nodeToNodeAddress().String())
}
delete(r.hosts, hostID)
r.mu.Unlock()
return ok
}
type clusterMetadata struct {
mu sync.RWMutex
partitioner string
}
func (c *clusterMetadata) setPartitioner(partitioner string) {
c.mu.Lock()
defer c.mu.Unlock()
if c.partitioner != partitioner {
// TODO: update other things now
c.partitioner = partitioner
}
}