in policies.go [579:689]
func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost {
if qry == nil {
return t.fallback.Pick(qry)
}
routingKey, err := qry.GetRoutingKey()
if err != nil {
return t.fallback.Pick(qry)
} else if routingKey == nil {
return t.fallback.Pick(qry)
}
meta := t.getMetadataReadOnly()
if meta == nil || meta.tokenRing == nil {
return t.fallback.Pick(qry)
}
token := meta.tokenRing.partitioner.Hash(routingKey)
ht := meta.replicas[qry.Keyspace()].replicasFor(token)
var replicas []*HostInfo
if ht == nil {
host, _ := meta.tokenRing.GetHostForToken(token)
replicas = []*HostInfo{host}
} else {
replicas = ht.hosts
if t.shuffleReplicas {
replicas = shuffleHosts(replicas)
}
}
var (
fallbackIter NextHost
i, j, k int
remote [][]*HostInfo
tierer HostTierer
tiererOk bool
maxTier uint
)
if tierer, tiererOk = t.fallback.(HostTierer); tiererOk {
maxTier = tierer.MaxHostTier()
} else {
maxTier = 1
}
if t.nonLocalReplicasFallback {
remote = make([][]*HostInfo, maxTier)
}
used := make(map[*HostInfo]bool, len(replicas))
return func() SelectedHost {
for i < len(replicas) {
h := replicas[i]
i++
var tier uint
if tiererOk {
tier = tierer.HostTier(h)
} else if t.fallback.IsLocal(h) {
tier = 0
} else {
tier = 1
}
if tier != 0 {
if t.nonLocalReplicasFallback {
remote[tier-1] = append(remote[tier-1], h)
}
continue
}
if h.IsUp() {
used[h] = true
return (*selectedHost)(h)
}
}
if t.nonLocalReplicasFallback {
for j < len(remote) && k < len(remote[j]) {
h := remote[j][k]
k++
if k >= len(remote[j]) {
j++
k = 0
}
if h.IsUp() {
used[h] = true
return (*selectedHost)(h)
}
}
}
if fallbackIter == nil {
// fallback
fallbackIter = t.fallback.Pick(qry)
}
// filter the token aware selected hosts from the fallback hosts
for fallbackHost := fallbackIter(); fallbackHost != nil; fallbackHost = fallbackIter() {
if !used[fallbackHost.Info()] {
used[fallbackHost.Info()] = true
return fallbackHost
}
}
return nil
}
}